From 67a232cdf76625cd9dde3562d3d0e32a93a4898f Mon Sep 17 00:00:00 2001 From: jiangfb <jayphbee@163.com> Date: Mon, 4 Jan 2021 17:46:36 +0800 Subject: [PATCH 1/2] get all pids --- src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index f04adcc..ac47de9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,7 +57,7 @@ use pi_core::{ use pi_core_builtin::set_external_async_runtime; use pi_core_lib::set_file_async_runtime; use pi_serv_ext::register_ext_functions; -use pi_serv_lib::{js_db::global_db_mgr, js_gray::GRAY_MGR}; +use pi_serv_lib::{js_db::global_db_mgr, js_gray::{VID_CONTEXTS, GRAY_MGR}}; use pi_serv_lib::{set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime}; #[cfg(feature = "profiling_heap")] use profiling_pi_core::{ @@ -103,7 +103,6 @@ lazy_static! { static ref MQTT_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![])); //Http端å£ä»£ç†æ˜ 射表 static ref HTTP_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![])); - static ref VID_CONTEXTS: Arc<Mutex<XHashMap<usize, Vec<ContextHandle>>>> = Arc::new(Mutex::new(XHashMap::default())); //æŽ§åˆ¶å° static ref CONSOLE_SHELL: RwLock<Option<ConsoleShell>> = RwLock::new(None); -- GitLab From ed755affdd094ff8074a146858596987564ba529 Mon Sep 17 00:00:00 2001 From: jiangfb <jayphbee@163.com> Date: Thu, 7 Jan 2021 18:12:30 +0800 Subject: [PATCH 2/2] fix debug mode mqtt subscribe bug --- src/js_net.rs | 46 ++++++++++++++++++++++++---------------------- src/main.rs | 12 +++++++++--- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/src/js_net.rs b/src/js_net.rs index 77a65d9..b798df4 100644 --- a/src/js_net.rs +++ b/src/js_net.rs @@ -440,29 +440,31 @@ pub fn create_http_pid(host: &String, port: u16) { // 绑定mqtt监å¬å™¨ pub fn bind_mqtt_tcp_port(port: u16, use_tls: bool, protocol: String, broker_name: String) { - MQTT_PORTS.lock().push((port, broker_name.clone())); - let event_handler = Arc::new(MqttConnectHandler::new()); - let rpc_handler = Arc::new(MqttRequestHandler::new()); - let listener = Arc::new(MqttProxyListener::with_handler(Some(event_handler))); - let service = Arc::new(MqttProxyService::with_handler(Some(rpc_handler))); - - if use_tls { - let broker_factory = Arc::new(WssMqttBrokerFactory::new(&protocol, &broker_name, port)); - - SECURE_SERVICES.write().push(SecureServices(( - port.clone(), - Box::new(WebsocketListenerFactory::<FTlsSocket>::with_protocol_factory(broker_factory)), - ))); - } else { - let broker_factory = Arc::new(WsMqttBrokerFactory::new(&protocol, &broker_name, port)); - - INSECURE_SERVICES.write().push(InsecureServices(( - port.clone(), - Box::new(WebsocketListenerFactory::<TcpSocket>::with_protocol_factory(broker_factory)), - ))); + let inserted = MQTT_PORTS.lock().insert((port, broker_name.clone())); + if inserted { + let event_handler = Arc::new(MqttConnectHandler::new()); + let rpc_handler = Arc::new(MqttRequestHandler::new()); + let listener = Arc::new(MqttProxyListener::with_handler(Some(event_handler))); + let service = Arc::new(MqttProxyService::with_handler(Some(rpc_handler))); + + if use_tls { + let broker_factory = Arc::new(WssMqttBrokerFactory::new(&protocol, &broker_name, port)); + + SECURE_SERVICES.write().push(SecureServices(( + port.clone(), + Box::new(WebsocketListenerFactory::<FTlsSocket>::with_protocol_factory(broker_factory)), + ))); + } else { + let broker_factory = Arc::new(WsMqttBrokerFactory::new(&protocol, &broker_name, port)); + + INSECURE_SERVICES.write().push(InsecureServices(( + port.clone(), + Box::new(WebsocketListenerFactory::<TcpSocket>::with_protocol_factory(broker_factory)), + ))); + } + register_listener(&broker_name, listener); + register_service(&broker_name, service); } - register_listener(&broker_name, listener); - register_service(&broker_name, service); } // httpMsg包装 diff --git a/src/main.rs b/src/main.rs index ac47de9..ed12723 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ extern crate pi_core; #[macro_use] extern crate profiling_pi_core; +use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -57,8 +58,13 @@ use pi_core::{ use pi_core_builtin::set_external_async_runtime; use pi_core_lib::set_file_async_runtime; use pi_serv_ext::register_ext_functions; -use pi_serv_lib::{js_db::global_db_mgr, js_gray::{VID_CONTEXTS, GRAY_MGR}}; -use pi_serv_lib::{set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime}; +use pi_serv_lib::{ + js_db::global_db_mgr, + js_gray::{GRAY_MGR, VID_CONTEXTS}, +}; +use pi_serv_lib::{ + set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime, +}; #[cfg(feature = "profiling_heap")] use profiling_pi_core::{ console::{set_console_shell_ctrlc_handler, ConsoleShell, ConsoleShellBuilder}, @@ -100,7 +106,7 @@ lazy_static! { pool.startup(false) }; //Mqtt端å£ä»£ç†æ˜ 射表 - static ref MQTT_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![])); + static ref MQTT_PORTS: Arc<Mutex<HashSet<(u16, String)>>> = Arc::new(Mutex::new(HashSet::new())); //Http端å£ä»£ç†æ˜ 射表 static ref HTTP_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![])); -- GitLab