diff --git a/Cargo.toml b/Cargo.toml index f0923fcdfc081df12c80e4b7f4b03e13e2320cb3..05e20ebefe686c2a6b1bfc5f1af73e3f51b3d699 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,34 @@ hash = { path = "../pi_lib/hash" } tcp = { path = "../pi_net/tcp" } ws = { path = "../pi_net/ws" } pi_serv_ext = { path = "../pi_serv_ext"} -pi_serv_lib = { path = "../pi_serv_lib" } \ No newline at end of file +pi_serv_lib = { path = "../pi_serv_lib" } + +rusty_v8 = { path = "../rusty_v8" } +libc = "*" +wild = "2.0.2" +fnv = "*" +magnetic = "2.0.0" +toml = "0.4.6" +json = "0.11.13" +rand = "0.5.2" +mqtt3 = { git = "https://github.com/tekjar/mqtt3" } +hash_value = { path = "../pi_math/hash_value" } +pi_db = { path = "../pi_db" } +net = { path = "../pi_net/net" } +mqtt_tmp = { path = "../pi_net/mqtt_tmp" } +rpc_tmp = { path = "../pi_net/rpc_tmp" } +http = { path = "../pi_net/http"} +httpc = { path = "../pi_net/httpc" } +https = { path = "../pi_net/https" } +https_external = { package = "http", version = "0.2" } +mqtt = { path = "../pi_net/mqtt" } +mqtt_proxy = { path = "../pi_net/mqtt_proxy"} +atom = { path = "../pi_lib/atom" } +worker = { path = "../pi_lib/worker" } +handler = { path = "../pi_lib/handler" } +file = { path = "../pi_lib/file" } +gray = {path = "../pi_lib/gray" } +guid = {path = "../pi_lib/guid" } +guid64 = {path = "../pi_lib/guid64" } +util = {path = "../pi_lib/util" } +timer = {path = "../pi_lib/timer" } \ No newline at end of file diff --git a/src/js_net.rs b/src/js_net.rs new file mode 100644 index 0000000000000000000000000000000000000000..90c3607fea67ee6c04b90666c06993d7b4ffac03 --- /dev/null +++ b/src/js_net.rs @@ -0,0 +1,1366 @@ +use std::cell::RefCell; +use std::env; +use std::io::{Error, ErrorKind}; +use std::net::SocketAddr; +use std::sync::RwLock as StdRwlock; +use std::sync::{Arc, Mutex}; + +use fnv::FnvHashMap; +use fnv::FnvHashSet; +use futures::future::BoxFuture; +use mqtt3; +use parking_lot::RwLock; +use ws::server::WebsocketListenerFactory; + +use atom::Atom; +use gray::{GrayTab, GrayVersion}; +use handler::{Args, Handler, SGenType}; +use https_external::header::HeaderMap; +use mqtt::server::{ + add_topic, publish_topic, register_listener, register_service, WsMqttBrokerFactory, + WssMqttBrokerFactory, +}; +use mqtt::util::AsyncResult; +use mqtt_proxy::service::{MqttConnectHandle, MqttEvent, MqttProxyListener, MqttProxyService}; +use mqtt_tmp::data::Server; +use mqtt_tmp::server::{ClientStub, ServerNode}; +use mqtt_tmp::session::Session; +use net::api::{NetManager, TlsManager}; +use net::api::{Socket, Stream}; +use std::io::Result as IOResult; +use tcp::buffer_pool::WriteBufferPool; +use tcp::connect::TcpSocket; +use tcp::driver::{ + AsyncIOWait, AsyncServiceFactory, Socket as SocketTrait, SocketConfig, Stream as StreamTrait, +}; +use tcp::server::{AsyncPortsFactory, AsyncWaitsHandle, SocketListener}; +use tcp::tls_connect::TlsSocket as FTlsSocket; +use tcp::util::{close_socket, TlsConfig}; + +use rusty_v8 as v8; +use vm_builtin::buffer::NativeArrayBuffer; +use vm_builtin::process::{process_close, process_send, process_spawn, Pid, ProcessMsg}; +use vm_builtin::ContextHandle; +use vm_core::vm::{send_to_process, Vm}; + +use crate::create_init_vm; +use hash::XHashMap; +use http::batch_load::BatchLoad; +use http::cors_handler::CORSHandler; +use http::default_parser::DefaultParser; +use http::file_load::FileLoad; +use http::files_load::FilesLoad; +use http::middleware::{Middleware, MiddlewareChain, MiddlewareResult}; +use http::multi_parts::MutilParts; +use http::port::HttpPort; +use http::range_load::RangeLoad; +use http::request::HttpRequest; +use http::response::{HttpResponse, ResponseHandler}; +use http::route::HttpRoute; +use http::server::HttpListenerFactory; +use http::static_cache::StaticCache; +use http::upload::UploadFile; +use http::virtual_host::VirtualHostPool; +use http::virtual_host::{VirtualHost, VirtualHostTab}; +use pi_serv_lib::js_net::MqttConnection; +use pi_serv_lib::{set_pi_serv_handle, PiServNetHandle}; + +lazy_static! { + // static ref HTTP_ENDPOINT: Arc<RwLock<FnvHashMap<String, String>>> = Arc::new(RwLock::new(FnvHashMap::default())); + // https + static ref SECURE_SERVICES: Arc<RwLock<Vec<SecureServices>>> = Arc::new(RwLock::new(vec![])); + // http + static ref INSECURE_SERVICES: Arc<RwLock<Vec<InsecureServices>>> = Arc::new(RwLock::new(vec![])); + // æ¯ä¸ªç«¯å£çš„è¯ä¹¦é…ç½® (port, (cert_path, priv_key_path)) + static ref CERTIFICATES: Arc<RwLock<FnvHashMap<u16, (String, String)>>> = Arc::new(RwLock::new(FnvHashMap::default())); + // httpsé…ç½® + static ref SECURE_HTTP_CONFIGS: Arc<RwLock<FnvHashMap<u16, Vec<HttpConfig>>>> = Arc::new(RwLock::new(FnvHashMap::default())); + // httpé…ç½® + static ref INSECURE_HTTP_CONFIGS: Arc<RwLock<FnvHashMap<u16, Vec<HttpConfig>>>> = Arc::new(RwLock::new(FnvHashMap::default())); + // static ref BROKER_TOPICS: Arc<RwLock<FnvHashMap<String, FnvHashSet<String>>>> = Arc::new(RwLock::new(FnvHashMap::default())); + // mqttå称绑定listenerPID + static ref BUILD_LISTENER_TAB: Arc<RwLock<FnvHashMap<String, Pid>>> = Arc::new(RwLock::new(FnvHashMap::default())); +} + +// 注册pi_ser方法 +pub fn reg_pi_serv_handle() { + let pi_serv_handle = PiServNetHandle { + bind_mqtt_tcp_port: bind_mqtt_tcp_port, + start_network_services: start_network_services, + parse_http_config: parse_http_config, + config_certificate: config_certificate, + }; + // 注入pi_ser_net方法到pi_serv_lib + set_pi_serv_handle(pi_serv_handle); +} + + +struct InsecureServices( + ( + u16, + Box< + dyn AsyncServiceFactory< + Connect = TcpSocket, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + ), +); +struct SecureServices( + ( + u16, + Box< + dyn AsyncServiceFactory< + Connect = FTlsSocket, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + ), +); + +unsafe impl Send for InsecureServices {} +unsafe impl Sync for InsecureServices {} + +unsafe impl Send for SecureServices {} +unsafe impl Sync for SecureServices {} + +fn get_pid(broker_name: &String) -> Pid { + BUILD_LISTENER_TAB.read().get(broker_name).cloned().unwrap() +} + +// Mqtt连接和关é—è¿žæŽ¥å¤„ç† +#[derive(Clone)] +struct MqttConnectHandler {} + +impl MqttConnectHandler { + pub fn new() -> Self { + MqttConnectHandler {} + } +} + +impl Handler for MqttConnectHandler { + type A = MqttEvent; + type B = (); + type C = (); + type D = (); + type E = (); + type F = (); + type G = (); + type H = (); + type HandleResult = (); + + fn handle( + &self, + env: Arc<dyn GrayVersion>, + topic: Atom, + args: Args<Self::A, Self::B, Self::C, Self::D, Self::E, Self::F, Self::G, Self::H>, + ) -> Self::HandleResult { + let connect = unsafe { Arc::from_raw(Arc::into_raw(env) as *const MqttConnectHandle) }; + let connect_id = connect.get_id(); + let port = connect.get_local_port().unwrap(); + match args { + Args::OneArgs(MqttEvent::Connect( + socket_id, + broker_name, + client_id, + keep_alive, + is_clean_session, + user, + pwd, + result, + )) => { + let pid = get_pid(&broker_name); + //处ç†Mqtt连接 + let mqtt_connection = MqttConnection::new( + connect, + Some(result), + socket_id, + client_id, + Some(keep_alive), + Some(is_clean_session), + user, + pwd, + ); + let mqtt_connection_ptr = Box::into_raw(Box::new(mqtt_connection)) as usize; + let json_object = object! { + "connection" => mqtt_connection_ptr, + "mqtt_event" => "connect", + "broker_name" => broker_name, + "socket_id" => socket_id, + }; + let data_str = json_object.dump(); + // listenerPIDå‘é€æ¶ˆæ¯ + send_to_process(None, pid, ProcessMsg::String(data_str)); + } + Args::OneArgs(MqttEvent::Disconnect(socket_id, broker_name, client_id, reason)) => { + let pid = get_pid(&broker_name); + //处ç†Mqttè¿žæŽ¥å…³é— + let json_object = object! { + "mqtt_event" => "disconnect", + "socket_id" => socket_id, + }; + let data_str = json_object.dump(); + // listenerPIDå‘é€æ¶ˆæ¯ + send_to_process(None, pid, ProcessMsg::String(data_str)); + } + _ => panic!("invalid MqttConnectHandler handler args"), + } + } +} + +// 处ç†mqtt 订阅,退订和消æ¯å‘布 +#[derive(Clone)] +struct MqttRequestHandler {} + +impl MqttRequestHandler { + pub fn new() -> Self { + MqttRequestHandler {} + } +} + +impl Handler for MqttRequestHandler { + type A = MqttEvent; + type B = (); + type C = (); + type D = (); + type E = (); + type F = (); + type G = (); + type H = (); + type HandleResult = (); + + fn handle( + &self, + env: Arc<dyn GrayVersion>, + _topic: Atom, + args: Args<Self::A, Self::B, Self::C, Self::D, Self::E, Self::F, Self::G, Self::H>, + ) -> Self::HandleResult { + let connect = unsafe { Arc::from_raw(Arc::into_raw(env) as *const MqttConnectHandle) }; + let session = connect.get_session().unwrap(); + let context = session.as_ref().get_context(); + // 获å–会è¯ä¸çš„pid TODO当å‰çš„vmä¸å˜åœ¨ï¼Œåº”该从ç°åº¦ä¸èŽ·å–vm实例,ç‰æŽ¥å£å®ŒæˆåŽå†ä¿®æ”¹ + let (pid, vm) = context.get::<(Pid, Vm)>().unwrap().as_ref().clone(); + let vm_copy = vm.clone(); + let context_v8 = ContextHandle(pid.1); + match args { + Args::OneArgs(MqttEvent::Sub( + _socket_id, + _broker_name, + _client_id, + topics, + _result, + )) => { + //处ç†Mqtt订阅主题 + let mut topics2 = vec![]; + for (sub_topic, _) in topics { + topics2.push(sub_topic); + } + // let json_object = object! { + // "mqtt_event" => "sub", + // "topics" => topics2 + // }; + // let data_str = json_object.dump(); + // // listenerPIDå‘é€æ¶ˆæ¯ + // send_to_process(None, pid, ProcessMsg::String(data_str)); + let topics_str = json::stringify(topics2); + vm.spawn_task(async move { + let topics = vm_copy + .new_js_string(context_v8, Some(topics_str)) + .await + .unwrap(); + vm_copy.callback(context_v8, "_$mqttSub", vec![topics]); + }); + } + Args::OneArgs(MqttEvent::Unsub(socket_id, broker_name, client_id, topics)) => { + //处ç†Mqtt退订主题 + // let json_object = object! { + // "mqtt_event" => "unsub", + // "topics" => topics + // }; + // let data_str = json_object.dump(); + // // listenerPIDå‘é€æ¶ˆæ¯ + // send_to_process(None, pid, ProcessMsg::String(data_str)); + let topics_str = json::stringify(topics); + vm.spawn_task(async move { + let topics = vm_copy + .new_js_string(context_v8, Some(topics_str)) + .await + .unwrap(); + vm_copy.callback(context_v8, "_$mqttUnSub", vec![topics]); + }); + } + Args::OneArgs(MqttEvent::Publish( + socket_id, + broker_name, + client_id, + address, + topic, + payload, + )) => { + //处ç†Mqttå‘布主题 + // let json_object = object! { + // "mqtt_event" => "publish", + // "topic" => topic, + // "payload" => payload.as_ref().to_vec() + // }; + // let data_str = json_object.dump(); + // // listenerPIDå‘é€æ¶ˆæ¯ + // send_to_process(None, pid, ProcessMsg::String(data_str)); + let payload_copy = payload.to_vec(); + // 虚拟机线程执行异æ¥ä»»åŠ¡ + vm.spawn_task(async move { + let buf = NativeArrayBuffer::from(payload_copy.into_boxed_slice()); + let payload = vm_copy + .native_buffer_to_js_array_buffer(context_v8, &buf) + .await + .unwrap(); + let topic = vm_copy + .new_js_string(context_v8, Some(topic)) + .await + .unwrap(); + vm_copy.callback(context_v8, "_$mqttSend", vec![topic, payload]); + }); + } + _ => panic!("invalid MqttRequestHandler handler args"), + } + } +} + +// 创建listenerPID +fn create_listener_pid(port: u16, broker_name: &String) { + // 判æ–pid是å¦å˜åœ¨ + // BUILD_LISTENER_TAB.read().get(broker_name) + if let None = BUILD_LISTENER_TAB.read().get(broker_name) { + // 获å–基础ç°åº¦å¯¹åº”çš„vm列表 TODO + // æ›´åŠ portå–余分é…vm TODO + let vm = create_init_vm(11, 111, None); + let vm_copy = vm.clone(); + let cid = vm.alloc_context_id(); + vm.spawn_task(async move { + let context = vm_copy.new_context(None, cid, None).await.unwrap(); + if let Err(e) = vm_copy + .execute( + context, + "start_listener_pid.js", + r#" + (<any>self)._$listener_set_receive();"#, + ) + .await + { + panic!(e); + } + }); + BUILD_LISTENER_TAB + .write() + .insert(broker_name.clone(), Pid(vm.get_vid(), cid)); + } +} + +// 绑定mqtt监å¬å™¨ +pub fn bind_mqtt_tcp_port(port: u16, use_tls: bool, protocol: String, broker_name: String) { + // 创建listenerPID + create_listener_pid(port, &broker_name); + let event_handler = Arc::new(MqttConnectHandler::new()); + let rpc_handler = Arc::new(MqttRequestHandler::new()); + let listener = Arc::new(MqttProxyListener::with_handler(Some(event_handler))); + let service = Arc::new(MqttProxyService::with_handler(Some(rpc_handler))); + + if use_tls { + let broker_factory = Arc::new(WssMqttBrokerFactory::new(&protocol, &broker_name, port)); + + SECURE_SERVICES.write().push(SecureServices(( + port.clone(), + Box::new(WebsocketListenerFactory::<FTlsSocket>::with_protocol_factory(broker_factory)), + ))); + } else { + let broker_factory = Arc::new(WsMqttBrokerFactory::new(&protocol, &broker_name, port)); + + INSECURE_SERVICES.write().push(InsecureServices(( + port.clone(), + Box::new(WebsocketListenerFactory::<TcpSocket>::with_protocol_factory(broker_factory)), + ))); + } + register_listener(&broker_name, listener); + register_service(&broker_name, service); +} + +#[derive(Clone)] +pub struct SecureHttpRpcRequestHandler {} + +#[derive(Clone)] +pub struct InsecureHttpRpcRequstHandler {} + +unsafe impl Send for SecureHttpRpcRequestHandler {} +unsafe impl Sync for SecureHttpRpcRequestHandler {} + +unsafe impl Send for InsecureHttpRpcRequstHandler {} +unsafe impl Sync for InsecureHttpRpcRequstHandler {} + +impl Handler for InsecureHttpRpcRequstHandler { + type A = SocketAddr; + type B = Arc<HeaderMap>; + type C = Arc<RefCell<XHashMap<String, SGenType>>>; + type D = ResponseHandler<TcpSocket>; + type E = (); + type F = (); + type G = (); + type H = (); + type HandleResult = (); + + //处ç†æ–¹æ³• + fn handle( + &self, + env: Arc<dyn GrayVersion>, + topic: Atom, + args: Args<Self::A, Self::B, Self::C, Self::D, Self::E, Self::F, Self::G, Self::H>, + ) -> Self::HandleResult { + } +} + +impl InsecureHttpRpcRequstHandler { + pub fn new() -> Self { + InsecureHttpRpcRequstHandler {} + } +} + +impl Handler for SecureHttpRpcRequestHandler { + type A = SocketAddr; + type B = Arc<HeaderMap>; + type C = Arc<RefCell<XHashMap<String, SGenType>>>; + type D = ResponseHandler<FTlsSocket>; + type E = (); + type F = (); + type G = (); + type H = (); + type HandleResult = (); + + //处ç†æ–¹æ³• + fn handle( + &self, + env: Arc<dyn GrayVersion>, + topic: Atom, + args: Args<Self::A, Self::B, Self::C, Self::D, Self::E, Self::F, Self::G, Self::H>, + ) -> Self::HandleResult { + } +} + +impl SecureHttpRpcRequestHandler { + pub fn new() -> Self { + SecureHttpRpcRequestHandler {} + } +} + +fn build_secure_service() -> Result<(), String> { + for (port, http_configs) in SECURE_HTTP_CONFIGS.read().iter() { + let handler = Arc::new(SecureHttpRpcRequestHandler::new()); + let http_port = Arc::new(HttpPort::with_handler(None, handler)); + + let r = build_service::<FTlsSocket>(port.clone(), http_configs, http_port); + SECURE_SERVICES.write().push(SecureServices(r)); + } + Ok(()) +} + +fn build_insecure_service() -> Result<(), String> { + for (port, http_configs) in INSECURE_HTTP_CONFIGS.read().iter() { + let handler = Arc::new(InsecureHttpRpcRequstHandler::new()); + let http_port = Arc::new(HttpPort::with_handler(None, handler)); + + let r = build_service::<TcpSocket>(port.clone(), http_configs, http_port); + INSECURE_SERVICES.write().push(InsecureServices(r)); + } + + Ok(()) +} + +// å¯åŠ¨ç½‘络æœåŠ¡ +pub fn start_network_services( + recv_buf_size: usize, + send_buf_size: usize, + read_buf_cap: usize, + write_buf_cap: usize, + pool_size: usize, + stack_size: usize, + timeout: usize, +) -> Result<(), String> { + // 准备安全æœåŠ¡é…ç½® + build_secure_service()?; + // 准备éžå®‰å…¨æœåŠ¡é…ç½® + build_insecure_service()?; + + let mut secure_services: Vec<( + u16, + TlsConfig, + Box< + dyn AsyncServiceFactory< + Connect = FTlsSocket, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + )> = vec![]; + for SecureServices((port, service)) in SECURE_SERVICES.write().drain(..).into_iter() { + println!("start_network_services secure service port = {:?}", port); + match CERTIFICATES.read().get(&port) { + Some((cert_path, priv_key_path)) => { + let tls_config = TlsConfig::new_server( + "", + false, + cert_path, + priv_key_path, + "", + "", + "", + 512, + false, + "", + ) + .unwrap(); + + secure_services.push((port, tls_config, service)); + } + None => panic!( + "port {:?} configured use TLS, but no certificate specified", + port + ), + } + } + + let mut insecure_services: Vec<( + u16, + Box< + dyn AsyncServiceFactory< + Connect = TcpSocket, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + )> = vec![]; + for InsecureServices((port, service)) in INSECURE_SERVICES.write().drain(..).into_iter() { + debug!("start_network_services insecure service port = {:?}", port); + insecure_services.push((port, service)); + } + + if insecure_services.len() > 0 { + global_bind_tcp_ports( + "0.0.0.0".to_string(), + insecure_services, + recv_buf_size, + send_buf_size, + read_buf_cap, + write_buf_cap, + pool_size, + stack_size, + timeout, + ); + } + + if secure_services.len() > 0 { + global_bind_tls_ports( + "0.0.0.0".to_string(), + secure_services, + recv_buf_size, + send_buf_size, + read_buf_cap, + write_buf_cap, + pool_size, + stack_size, + timeout, + ); + } + + Ok(()) +} + +#[derive(Debug, Clone)] +pub struct CorsAllow { + scheme: String, + host: String, + port: u16, + methods: Vec<String>, + max_age: Option<usize>, +} + +impl CorsAllow { + pub fn new( + scheme: String, + host: String, + port: u16, + methods: Vec<String>, + max_age: Option<usize>, + ) -> Self { + CorsAllow { + scheme, + host, + port, + methods, + max_age, + } + } +} + +#[derive(Debug, Default, Clone)] +pub struct HttpConfig { + // 多个主机å¯ä»¥å…±äº«ä¸€ä¸ªè·¯ç”±è¡¨ï¼Œ 也å¯ä»¥ä½¿ç”¨ä¸åŒçš„路由表 + virtual_hosts: RefCell<FnvHashMap<Vec<String>, Vec<HttpRouteTable>>>, + port: u16, + + keep_alive_timeout: usize, + + static_cache_max_size: usize, + static_cache_max_len: usize, + static_cache_collect_time: u64, + + // 是å¦å…许任æ„跨域 + cors: bool, + // 跨域白åå• + cors_allows: RefCell<Vec<CorsAllow>>, + + parser_min_plain_text_size: usize, + parse_compress_level: Option<u32>, + + multi_parts_block_size: usize, + + file_load_location: String, + file_load_need_cache: bool, + file_load_max_age: usize, + + files_load_location: String, + files_load_need_cache: bool, + files_load_max_age: usize, + + batch_load_location: String, + batch_load_need_cache: bool, + batch_load_max_age: usize, + + upload_file_location: String, + + // 安全连接还是éžå®‰å…¨è¿žæŽ¥ + http_port: bool, +} + +unsafe impl Send for HttpConfig {} +unsafe impl Sync for HttpConfig {} + +impl HttpConfig { + pub fn new() -> Self { + HttpConfig::default() + } + + pub fn bind_http_port(&mut self, port: u16) { + self.port = port; + } + + pub fn config_static_cache(&mut self, max_size: usize, max_len: usize, collect_time: u64) { + self.static_cache_max_size = max_size; + self.static_cache_max_len = max_len; + self.static_cache_collect_time = collect_time; + } + + pub fn config_set_keep_alive_timeout(&mut self, timeout: usize) { + self.keep_alive_timeout = timeout; + } + + pub fn config_cors(&mut self, enable: bool) { + self.cors = enable; + } + + pub fn add_cors_allow(&mut self, allow: CorsAllow) { + self.cors_allows.borrow_mut().push(allow); + } + + pub fn config_parser(&mut self, min_plain_text_size: usize, compress_level: Option<u32>) { + self.parser_min_plain_text_size = min_plain_text_size; + self.parse_compress_level = compress_level; + } + + pub fn config_multi_parts(&mut self, block_size: usize) { + self.multi_parts_block_size = block_size; + } + + pub fn config_file_load(&mut self, location: String, need_cache: bool, max_age: usize) { + self.file_load_location = location; + self.file_load_need_cache = need_cache; + self.file_load_max_age = max_age; + } + + pub fn config_files_load(&mut self, location: String, need_cache: bool, max_age: usize) { + self.files_load_location = location; + self.files_load_need_cache = need_cache; + self.files_load_max_age = max_age; + } + + pub fn config_batch_load(&mut self, location: String, need_cache: bool, max_age: usize) { + self.batch_load_location = location; + self.batch_load_need_cache = need_cache; + self.batch_load_max_age = max_age; + } + + pub fn config_upload_file(&mut self, location: String) { + self.upload_file_location = location; + } + + pub fn config_http_port(&mut self, secure: bool) { + self.http_port = secure; + } + + // ç»™è™šæ‹Ÿä¸»æœºæ·»åŠ è·¯ç”±è¡¨ + pub fn add_route_for_hosts( + &mut self, + virtual_hosts: Vec<String>, + endpoint: String, + methods: Vec<String>, + handler_name: String, + ) { + let route = HttpRouteTable::new(endpoint, methods, handler_name); + self.virtual_hosts + .borrow_mut() + .entry(virtual_hosts) + .and_modify(|routes| { + routes.push(route.clone()); + }) + .or_insert_with(|| vec![route]); + } +} + +#[derive(Debug, Clone)] +pub struct HttpRouteTable { + endpoint: String, + methods: Vec<String>, + handler_name: String, +} + +impl HttpRouteTable { + pub fn new(endpoint: String, methods: Vec<String>, handler_name: String) -> Self { + HttpRouteTable { + endpoint, + methods, + handler_name, + } + } +} + +/** +* 为指定地å€çš„指定端å£ï¼Œè®¾ç½®æŒ‡å®šçš„网络æœåŠ¡å·¥åŽ‚,并绑定对应的Tlsç«¯å£ +*/ +pub fn global_bind_tls_ports<S: SocketTrait + StreamTrait>( + ip: String, //绑定的本地ipåœ°å€ + binds: Vec<( + u16, + TlsConfig, + Box< + dyn AsyncServiceFactory< + Connect = S, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + )>, + recv_buffer_size: usize, //连接的接收缓冲区,å•ä½B + send_buffer_size: usize, //连接的å‘é€ç¼“冲区,å•ä½B + read_buffer_capacity: usize, //连接的读缓冲区,å•ä½B + write_buffer_capacity: usize, //连接的写缓冲区,å•ä½æ¬¡ + pool_size: usize, //è¿žæŽ¥æ± çš„åˆå§‹å®¹é‡ + stack_size: usize, //è¿žæŽ¥çº¿ç¨‹çš„æ ˆå¤§å° + timeout: usize, //连接轮询的间隔时长,å•ä½æ¯«ç§’ +) { + let mut ports = Vec::with_capacity(binds.len()); + let mut factory = AsyncPortsFactory::<S>::new(); + for (port, tls_cfg, service) in binds { + ports.push((port, tls_cfg)); + factory.bind(port, service); + } + + let mut config = SocketConfig::with_tls(&ip, ports.as_slice()); + config.set_option( + recv_buffer_size, + send_buffer_size, + read_buffer_capacity, + write_buffer_capacity, + ); + let buffer = WriteBufferPool::new(10000, 10, 3).ok().unwrap(); + match SocketListener::<S, _>::bind( + factory, + buffer, + config, + pool_size, + stack_size, + 1024, + Some(timeout), + ) { + Err(e) => { + panic!("Bind tcp port Error, reason: {:?}", e); + } + Ok(_) => { + info!( + "===> Bind tcp port ok, ports: {:?}", + ports + .iter() + .cloned() + .unzip::<_, _, Vec<u16>, Vec<TlsConfig>>() + .0 + ); + } + } +} + +/** +* 为指定地å€çš„指定端å£ï¼Œè®¾ç½®æŒ‡å®šçš„网络æœåŠ¡å·¥åŽ‚,并绑定对应的Tcpç«¯å£ +*/ +pub fn global_bind_tcp_ports<S: SocketTrait + StreamTrait>( + ip: String, //绑定的本地ipåœ°å€ + binds: Vec<( + u16, + Box< + dyn AsyncServiceFactory< + Connect = S, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, + )>, + recv_buffer_size: usize, //连接的接收缓冲区,å•ä½B + send_buffer_size: usize, //连接的å‘é€ç¼“冲区,å•ä½B + read_buffer_capacity: usize, //连接的读缓冲区,å•ä½B + write_buffer_capacity: usize, //连接的写缓冲区,å•ä½æ¬¡ + pool_size: usize, //è¿žæŽ¥æ± çš„åˆå§‹å®¹é‡ + stack_size: usize, //è¿žæŽ¥çº¿ç¨‹çš„æ ˆå¤§å° + timeout: usize, //连接轮询的间隔时长,å•ä½æ¯«ç§’ +) { + let mut ports = Vec::with_capacity(binds.len()); + let mut factory = AsyncPortsFactory::<S>::new(); + for (port, service) in binds { + ports.push(port); + factory.bind(port, service); + } + + let mut config = SocketConfig::new(&ip, factory.bind_ports().as_slice()); + config.set_option( + recv_buffer_size, + send_buffer_size, + read_buffer_capacity, + write_buffer_capacity, + ); + let buffer = WriteBufferPool::new(10000, 10, 3).ok().unwrap(); + match SocketListener::<S, _>::bind( + factory, + buffer, + config, + pool_size, + stack_size, + 1024, + Some(timeout), + ) { + Err(e) => { + panic!("Bind tcp port Error, reason: {:?}", e); + } + Ok(_) => { + info!("===> Bind tcp port ok, ports: {:?}", ports); + } + } +} + +fn build_service<S: SocketTrait + StreamTrait>( + port: u16, + http_configs: &Vec<HttpConfig>, + http_port: Arc<HttpPort<S>>, +) -> ( + u16, + Box< + dyn AsyncServiceFactory< + Connect = S, + Waits = AsyncWaitsHandle, + Out = (), + Future = BoxFuture<'static, ()>, + >, + >, +) { + let mut hosts = VirtualHostTab::new(); + let mut keep_alive = 60000; + for http_config in http_configs { + if http_config.keep_alive_timeout > 0 { + keep_alive = http_config.keep_alive_timeout; + } + let enable_cache = http_config.static_cache_max_len > 0 + && http_config.static_cache_max_size > 0 + && http_config.static_cache_collect_time > 0; + let cors_handler = if http_config.cors { + Arc::new(CORSHandler::new( + "OPTIONS, GET, POST".to_string(), + Some(365 * 24 * 60 * 60), + )) + } else { + Arc::new(CORSHandler::new("OPTIONS, GET, POST".to_string(), None)) + }; + + if !http_config.cors { + for config in http_config.cors_allows.borrow().iter() { + if let Err(e) = cors_handler.allow_origin( + config.scheme.clone(), + config.host.clone(), + config.port, + &config.methods, + &[], + config.max_age, + ) { + panic!( + "failed to add origin, error = {:?}, config= {:?}", + e, config + ); + } + } + } + + let parser = Arc::new(DefaultParser::with( + http_config.parser_min_plain_text_size, + http_config.parse_compress_level, + )); + let multi_parts = Arc::new(MutilParts::with(http_config.multi_parts_block_size)); + let range_load = Arc::new(RangeLoad::new()); + + let file_load; + let files_load; + let batch_load; + + if enable_cache { + let cache = Arc::new(StaticCache::new( + http_config.static_cache_max_size, + http_config.static_cache_max_len, + )); + StaticCache::run_collect( + cache.clone(), + "http cache".to_string(), + http_config.static_cache_collect_time, + ); + file_load = Arc::new(FileLoad::new( + http_config.file_load_location.clone(), + Some(cache.clone()), + http_config.file_load_need_cache, + true, + true, + false, + http_config.file_load_max_age, + )); + files_load = Arc::new(FilesLoad::new( + http_config.files_load_location.clone(), + Some(cache.clone()), + http_config.files_load_need_cache, + true, + true, + false, + http_config.files_load_max_age, + )); + batch_load = Arc::new(BatchLoad::new( + http_config.batch_load_location.clone(), + Some(cache.clone()), + http_config.batch_load_need_cache, + true, + true, + false, + http_config.batch_load_max_age, + )); + } else { + file_load = Arc::new(FileLoad::new( + http_config.file_load_location.clone(), + None, + http_config.file_load_need_cache, + true, + true, + false, + http_config.file_load_max_age, + )); + files_load = Arc::new(FilesLoad::new( + http_config.files_load_location.clone(), + None, + http_config.files_load_need_cache, + true, + true, + false, + http_config.files_load_max_age, + )); + batch_load = Arc::new(BatchLoad::new( + http_config.batch_load_location.clone(), + None, + http_config.batch_load_need_cache, + true, + true, + false, + http_config.batch_load_max_age, + )); + } + + let upload = Arc::new(UploadFile::new(http_config.upload_file_location.clone())); + + //构建处ç†CORSçš„Options方法的请求的ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.finish(); + let cors_middleware = Arc::new(chain); + + //构建处ç†æ–‡ä»¶åŠ 载的ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.push_back(parser.clone()); + chain.push_back(range_load.clone()); + chain.push_back(file_load); + chain.finish(); + let file_load_middleware = Arc::new(chain); + + //构建处ç†æ–‡ä»¶æ‰¹é‡åŠ 载的ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.push_back(parser.clone()); + chain.push_back(range_load.clone()); + chain.push_back(files_load); + chain.finish(); + let files_load_middleware = Arc::new(chain); + + //构建改进的处ç†æ–‡ä»¶æ‰¹é‡åŠ 载的ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.push_back(parser.clone()); + chain.push_back(range_load.clone()); + chain.push_back(batch_load); + chain.finish(); + let batch_load_middleware = Arc::new(chain); + + //构建处ç†æ–‡ä»¶ä¸Šä¼ çš„ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.push_back(parser.clone()); + chain.push_back(multi_parts.clone()); + chain.push_back(upload); + chain.finish(); + let upload_middleware = Arc::new(chain); + + //构建处ç†åŠ¨æ€èµ„æºè®¿é—®çš„ä¸é—´ä»¶é“¾ + let mut chain = MiddlewareChain::new(); + chain.push_back(cors_handler.clone()); + chain.push_back(parser); + chain.push_back(multi_parts.clone()); + chain.push_back(http_port.clone()); + chain.finish(); + let port_middleware = Arc::new(chain); + + for (vhs, routes) in http_config.virtual_hosts.borrow().iter() { + //构建路由 + let mut route = HttpRoute::new(); + route + .at("/") + .options(cors_middleware.clone()) + .at("/**") + .options(cors_middleware.clone()); + + for r in routes { + match r.handler_name.as_str() { + "fileLoad" => { + if r.methods.contains(&"GET".to_string()) { + route.at(&r.endpoint).get(file_load_middleware.clone()); + } + if r.methods.contains(&"POST".to_string()) { + route.at(&r.endpoint).post(file_load_middleware.clone()); + } + if r.methods.contains(&"OPTIONS".to_string()) { + route.at(&r.endpoint).options(file_load_middleware.clone()); + } + } + + "filesLoad" => { + if r.methods.contains(&"GET".to_string()) { + route.at(&r.endpoint).get(files_load_middleware.clone()); + } + if r.methods.contains(&"POST".to_string()) { + route.at(&r.endpoint).post(files_load_middleware.clone()); + } + if r.methods.contains(&"OPTIONS".to_string()) { + route.at(&r.endpoint).options(files_load_middleware.clone()); + } + } + + "batchLoad" => { + if r.methods.contains(&"GET".to_string()) { + route.at(&r.endpoint).get(batch_load_middleware.clone()); + } + if r.methods.contains(&"POST".to_string()) { + route.at(&r.endpoint).post(batch_load_middleware.clone()); + } + if r.methods.contains(&"OPTIONS".to_string()) { + route.at(&r.endpoint).options(batch_load_middleware.clone()); + } + } + + "upload" => { + if r.methods.contains(&"GET".to_string()) { + route.at(&r.endpoint).get(upload_middleware.clone()); + } + if r.methods.contains(&"POST".to_string()) { + route.at(&r.endpoint).post(upload_middleware.clone()); + } + if r.methods.contains(&"OPTIONS".to_string()) { + route.at(&r.endpoint).options(upload_middleware.clone()); + } + } + + "port" => { + if r.methods.contains(&"GET".to_string()) { + route.at(&r.endpoint).get(port_middleware.clone()); + } + if r.methods.contains(&"POST".to_string()) { + route.at(&r.endpoint).post(port_middleware.clone()); + } + if r.methods.contains(&"OPTIONS".to_string()) { + route.at(&r.endpoint).options(port_middleware.clone()); + } + } + + _ => { + panic!("unsupported secure middleware"); + } + } + } + + //构建虚拟主机 + let host = VirtualHost::with(route); + + // 多个主机共享一个路由表 + for vh in vhs { + println!( + "add insecure host = {:?}, port = {:?}", + vh, http_config.port + ); + let _ = hosts.add(vh, host.clone()); + } + } + } + + ( + port.clone(), + Box::new(HttpListenerFactory::with_hosts(hosts.clone(), keep_alive)), + ) +} + +// 解æžhttpé…ç½® +pub fn parse_http_config(jstr: String) { + // 环境å˜é‡çš„ip是以分å·åˆ†éš”çš„å—符串 + let replace_ip = match env::var("PTCONFIG_IP") { + Ok(ip) => Some(ip), + Err(_) => None, + }; + + match json::parse(&jstr) { + Ok(jobj) => { + for config in jobj["httpConfig"].members() { + let mut http_config = HttpConfig::new(); + + let http_port = config["httpPort"].as_bool().unwrap(); + http_config.config_http_port(config["httpPort"].as_bool().unwrap()); + + // 如果é…置了环境å˜é‡PTCONFIG_IP,则新增虚拟主机 + let virtual_host = match replace_ip.clone() { + Some(ip) => { + // 如果是 https é…ç½®, ä¸è¦ç”¨ ip æ›¿æ¢ + if http_port { + config["virtualHost"] + .members() + .map(|s| s.to_string()) + .collect::<Vec<String>>() + } else { + ip.split(";") + .map(|s| s.to_string()) + .collect::<Vec<String>>() + } + } + None => config["virtualHost"] + .members() + .map(|s| s.to_string()) + .collect::<Vec<String>>(), + }; + + let mut static_cache_collect_time: u64 = 0; + let mut static_cache_max_size: usize = 0; + let mut static_cache_max_len: usize = 0; + for (key, val) in config["staticCache"].entries() { + match key { + "maxSize" => static_cache_max_size = val.as_usize().unwrap(), + "maxLen" => static_cache_max_len = val.as_usize().unwrap(), + "collectTime" => static_cache_collect_time = val.as_u64().unwrap(), + _ => warn!("unknown field"), + } + } + http_config.config_static_cache( + static_cache_max_size, + static_cache_max_len, + static_cache_collect_time, + ); + + http_config.config_cors(config["CORS"].as_bool().unwrap()); + + for cors_allow in config["CORSAllows"].members() { + let scheme = cors_allow["scheme"].as_str().unwrap().to_string(); + let host = cors_allow["host"].as_str().unwrap().to_string(); + let port = cors_allow["port"].as_u16().unwrap(); + let methods = cors_allow["methods"] + .members() + .map(|s| s.to_string()) + .collect::<Vec<String>>(); + let max_age = cors_allow["maxAge"].as_usize(); + + // 如果é…置了环境å˜é‡PTCONFIG_IP,则需è¦æ–°å¢žè·¨åŸŸè§„则 + if let Some(ips) = replace_ip.clone() { + // éžhttps跨域é…ç½® + if !http_port { + for ip in ips.split(";") { + let c = CorsAllow::new( + scheme.clone(), + ip.to_string(), + port, + methods.clone(), + max_age, + ); + http_config.add_cors_allow(c); + } + } + } + let c = CorsAllow::new(scheme, host, port, methods, max_age); + http_config.add_cors_allow(c); + } + + let port = config["port"].as_u16().unwrap(); + http_config.bind_http_port(port); + http_config + .config_set_keep_alive_timeout(config["keepAliveTimeout"].as_usize().unwrap()); + + let mut parser_min_plain_text_size: usize = 0; + let mut parse_compress_level: Option<u32> = None; + for (key, val) in config["parser"].entries() { + match key { + "minPlainTextSize" => parser_min_plain_text_size = val.as_usize().unwrap(), + "compressLevel" => parse_compress_level = val.as_u32(), + _ => warn!("unknown field"), + } + } + http_config.config_parser(parser_min_plain_text_size, parse_compress_level); + + let mut multi_parts_block_size: usize = 0; + for (key, val) in config["mutilParts"].entries() { + match key { + "blockSize" => multi_parts_block_size = val.as_usize().unwrap(), + _ => warn!("unknown field"), + } + } + http_config.config_multi_parts(multi_parts_block_size); + + let mut file_load_location: String = "".to_string(); + let mut file_load_need_cache: bool = false; + let mut file_load_max_age: usize = 0; + for (key, val) in config["fileLoad"].entries() { + match key { + "location" => file_load_location = val.as_str().unwrap().to_string(), + "needCache" => file_load_need_cache = val.as_bool().unwrap(), + "maxAge" => file_load_max_age = val.as_usize().unwrap(), + _ => warn!("unknown field {:?}", key), + } + } + http_config.config_file_load( + file_load_location, + file_load_need_cache, + file_load_max_age, + ); + + let mut files_load_location: String = "".to_string(); + let mut files_load_need_cache: bool = false; + let mut files_load_max_age: usize = 0; + for (key, val) in config["filesLoad"].entries() { + match key { + "location" => files_load_location = val.as_str().unwrap().to_string(), + "needCache" => files_load_need_cache = val.as_bool().unwrap(), + "maxAge" => files_load_max_age = val.as_usize().unwrap(), + _ => warn!("unknown field {:?}", key), + } + } + http_config.config_files_load( + files_load_location, + files_load_need_cache, + files_load_max_age, + ); + + let mut batch_load_location: String = "".to_string(); + let mut batch_load_need_cache: bool = false; + let mut batch_load_max_age: usize = 0; + for (key, val) in config["batchLoad"].entries() { + match key { + "location" => batch_load_location = val.as_str().unwrap().to_string(), + "needCache" => batch_load_need_cache = val.as_bool().unwrap(), + "maxAge" => batch_load_max_age = val.as_usize().unwrap(), + _ => warn!("unknown field {:?}", key), + } + } + http_config.config_batch_load( + batch_load_location, + batch_load_need_cache, + batch_load_max_age, + ); + + for (key, val) in config["uploadFile"].entries() { + match key { + "location" => { + http_config.config_upload_file(val.as_str().unwrap().to_string()) + } + _ => warn!("unknown field {:?}", key), + } + } + + for route in config["routeTable"].members() { + let endpoint = route["endpoint"].as_str().unwrap().to_string(); + let methods = route["methods"] + .members() + .map(|s| s.to_string()) + .collect::<Vec<String>>(); + let handler_name = route["handlerName"].as_str().unwrap().to_string(); + http_config.add_route_for_hosts( + virtual_host.clone(), + endpoint, + methods, + handler_name, + ); + } + debug!("parsed http config ----- {:?}", http_config); + if http_port { + SECURE_HTTP_CONFIGS + .write() + .entry(port) + .and_modify(|configs| configs.push(http_config.clone())) + .or_insert(vec![http_config]); + } else { + INSECURE_HTTP_CONFIGS + .write() + .entry(port) + .and_modify(|configs| configs.push(http_config.clone())) + .or_insert(vec![http_config]); + } + } + } + + Err(e) => { + panic!( + "JSON parse error, please make sure it is a json string: {:?}, error: {:?}", + jstr, e + ); + } + } +} + +// é…ç½®è¯ä¹¦ +pub fn config_certificate(port: u16, cert_path: String, priv_key_path: String) { + CERTIFICATES + .write() + .insert(port, (cert_path, priv_key_path)); +} diff --git a/src/main.rs b/src/main.rs index b8055763dbbaa23d03ea4e2568eee66b3719d2a2..d0f8af805bae80031f7239d6f8478740d27d8c6b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ extern crate log; #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate json; use std::path::{Path, PathBuf}; use std::sync::{ @@ -35,12 +37,14 @@ use vm_builtin::{ContextHandle, VmStartupSnapshot}; use vm_core::{debug, init_v8, vm, worker}; use ws::server::WebsocketListenerFactory; -use pi_serv_lib::set_pi_serv_lib_file_runtime; use pi_serv_ext::register_ext_functions; +use pi_serv_lib::set_pi_serv_lib_file_runtime; mod init; +mod js_net; use init::init_js; +use js_net::reg_pi_serv_handle; lazy_static! { //主线程è¿è¡ŒçŠ¶æ€å’Œçº¿ç¨‹æ— æ¡ä»¶ä¼‘çœ è¶…æ—¶æ—¶é•¿ @@ -303,6 +307,8 @@ async fn async_main( // 注册文件异æ¥è¿è¡Œæ—¶ set_pi_serv_lib_file_runtime(FILES_ASYNC_RUNTIME.clone()); + // 注册pi_serv方法 + reg_pi_serv_handle(); let snapshot_context = init_snapshot(&init_vm).await;