From 67a232cdf76625cd9dde3562d3d0e32a93a4898f Mon Sep 17 00:00:00 2001
From: jiangfb <jayphbee@163.com>
Date: Mon, 4 Jan 2021 17:46:36 +0800
Subject: [PATCH 1/2] get all pids

---
 src/main.rs | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index f04adcc..ac47de9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -57,7 +57,7 @@ use pi_core::{
 use pi_core_builtin::set_external_async_runtime;
 use pi_core_lib::set_file_async_runtime;
 use pi_serv_ext::register_ext_functions;
-use pi_serv_lib::{js_db::global_db_mgr, js_gray::GRAY_MGR};
+use pi_serv_lib::{js_db::global_db_mgr, js_gray::{VID_CONTEXTS, GRAY_MGR}};
 use pi_serv_lib::{set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime};
 #[cfg(feature = "profiling_heap")]
 use profiling_pi_core::{
@@ -103,7 +103,6 @@ lazy_static! {
     static ref MQTT_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![]));
     //Http端口代理映射表
     static ref HTTP_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![]));
-    static ref VID_CONTEXTS: Arc<Mutex<XHashMap<usize, Vec<ContextHandle>>>> = Arc::new(Mutex::new(XHashMap::default()));
 
     //控制台
     static ref CONSOLE_SHELL: RwLock<Option<ConsoleShell>> = RwLock::new(None);
-- 
GitLab


From ed755affdd094ff8074a146858596987564ba529 Mon Sep 17 00:00:00 2001
From: jiangfb <jayphbee@163.com>
Date: Thu, 7 Jan 2021 18:12:30 +0800
Subject: [PATCH 2/2] fix debug mode mqtt subscribe bug

---
 src/js_net.rs | 46 ++++++++++++++++++++++++----------------------
 src/main.rs   | 12 +++++++++---
 2 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/src/js_net.rs b/src/js_net.rs
index 77a65d9..b798df4 100644
--- a/src/js_net.rs
+++ b/src/js_net.rs
@@ -440,29 +440,31 @@ pub fn create_http_pid(host: &String, port: u16) {
 
 // 绑定mqtt监听器
 pub fn bind_mqtt_tcp_port(port: u16, use_tls: bool, protocol: String, broker_name: String) {
-    MQTT_PORTS.lock().push((port, broker_name.clone()));
-    let event_handler = Arc::new(MqttConnectHandler::new());
-    let rpc_handler = Arc::new(MqttRequestHandler::new());
-    let listener = Arc::new(MqttProxyListener::with_handler(Some(event_handler)));
-    let service = Arc::new(MqttProxyService::with_handler(Some(rpc_handler)));
-
-    if use_tls {
-        let broker_factory = Arc::new(WssMqttBrokerFactory::new(&protocol, &broker_name, port));
-
-        SECURE_SERVICES.write().push(SecureServices((
-            port.clone(),
-            Box::new(WebsocketListenerFactory::<FTlsSocket>::with_protocol_factory(broker_factory)),
-        )));
-    } else {
-        let broker_factory = Arc::new(WsMqttBrokerFactory::new(&protocol, &broker_name, port));
-
-        INSECURE_SERVICES.write().push(InsecureServices((
-            port.clone(),
-            Box::new(WebsocketListenerFactory::<TcpSocket>::with_protocol_factory(broker_factory)),
-        )));
+    let inserted = MQTT_PORTS.lock().insert((port, broker_name.clone()));
+    if inserted {
+        let event_handler = Arc::new(MqttConnectHandler::new());
+        let rpc_handler = Arc::new(MqttRequestHandler::new());
+        let listener = Arc::new(MqttProxyListener::with_handler(Some(event_handler)));
+        let service = Arc::new(MqttProxyService::with_handler(Some(rpc_handler)));
+    
+        if use_tls {
+            let broker_factory = Arc::new(WssMqttBrokerFactory::new(&protocol, &broker_name, port));
+    
+            SECURE_SERVICES.write().push(SecureServices((
+                port.clone(),
+                Box::new(WebsocketListenerFactory::<FTlsSocket>::with_protocol_factory(broker_factory)),
+            )));
+        } else {
+            let broker_factory = Arc::new(WsMqttBrokerFactory::new(&protocol, &broker_name, port));
+    
+            INSECURE_SERVICES.write().push(InsecureServices((
+                port.clone(),
+                Box::new(WebsocketListenerFactory::<TcpSocket>::with_protocol_factory(broker_factory)),
+            )));
+        }
+        register_listener(&broker_name, listener);
+        register_service(&broker_name, service);
     }
-    register_listener(&broker_name, listener);
-    register_service(&broker_name, service);
 }
 
 // httpMsg包装
diff --git a/src/main.rs b/src/main.rs
index ac47de9..ed12723 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,6 +16,7 @@ extern crate pi_core;
 #[macro_use]
 extern crate profiling_pi_core;
 
+use std::collections::HashSet;
 use std::path::{Path, PathBuf};
 use std::sync::{
     atomic::{AtomicBool, Ordering},
@@ -57,8 +58,13 @@ use pi_core::{
 use pi_core_builtin::set_external_async_runtime;
 use pi_core_lib::set_file_async_runtime;
 use pi_serv_ext::register_ext_functions;
-use pi_serv_lib::{js_db::global_db_mgr, js_gray::{VID_CONTEXTS, GRAY_MGR}};
-use pi_serv_lib::{set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime};
+use pi_serv_lib::{
+    js_db::global_db_mgr,
+    js_gray::{GRAY_MGR, VID_CONTEXTS},
+};
+use pi_serv_lib::{
+    set_pi_serv_lib_file_runtime, set_pi_serv_lib_main_async_runtime, set_store_runtime,
+};
 #[cfg(feature = "profiling_heap")]
 use profiling_pi_core::{
     console::{set_console_shell_ctrlc_handler, ConsoleShell, ConsoleShellBuilder},
@@ -100,7 +106,7 @@ lazy_static! {
         pool.startup(false)
     };
     //Mqtt端口代理映射表
-    static ref MQTT_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![]));
+    static ref MQTT_PORTS: Arc<Mutex<HashSet<(u16, String)>>> = Arc::new(Mutex::new(HashSet::new()));
     //Http端口代理映射表
     static ref HTTP_PORTS: Arc<Mutex<Vec<(u16, String)>>> = Arc::new(Mutex::new(vec![]));
 
-- 
GitLab