diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index 236627d..e94f0a5 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -34,7 +34,7 @@ fn check_supporter(value: &Value, _: &HashMap) -> tera::Result - if (window.socket) { - window.socket.close(); - window.socket = undefined; - console.log("closed old"); - } - setTimeout(() => { if (window.socket) { if (window.socket_id === "{{ selected_channel }}") { @@ -460,6 +454,18 @@ } } + for (const anchor of document.querySelectorAll("a")) { + if (anchor.href.includes("{{ selected_channel }}")) { + continue; + } + + anchor.addEventListener("click", () => { + window.socket.close(); + window.socket = undefined; + console.log("force abandon socket"); + }); + } + const endpoint = `${window.location.origin.replace("http", "ws")}/api/v1/channels/{{ selected_channel }}/ws`; const socket = new WebSocket(endpoint); window.socket = socket; diff --git a/crates/app/src/routes/api/v1/channels/messages.rs b/crates/app/src/routes/api/v1/channels/messages.rs index 534aafd..058f9a8 100644 --- a/crates/app/src/routes/api/v1/channels/messages.rs +++ b/crates/app/src/routes/api/v1/channels/messages.rs @@ -48,7 +48,6 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { while let Ok(message) = receiver.recv() { if message == "Close" { let _ = sink.close().await; - drop(receiver); break; } @@ -56,20 +55,21 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { break; } } + + drop(receiver); + drop(sink); }); // ping let ping_sender = sender.clone(); let mut heartbeat_task = tokio::spawn(async move { - let mut heartbeat = tokio::time::interval(Duration::from_secs(30)); + let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); - loop { + while ping_sender.send("Ping".to_string()).is_ok() { heartbeat.tick().await; - if ping_sender.send("Ping".to_string()).is_err() { - // remote has abandoned us - break; - } } + + drop(ping_sender); }); // ... @@ -168,6 +168,8 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { } } } + + drop(recv_sender); }); // forward messages from redis to the mpsc @@ -176,14 +178,14 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { let mut pubsub = con.as_pubsub(); pubsub.subscribe(channel_id).unwrap(); - loop { - while let Ok(msg) = pubsub.get_message() { - // payload is a stringified SocketMessage - if send_task_sender.send(msg.get_payload().unwrap()).is_err() { - break; - } + while let Ok(msg) = pubsub.get_message() { + // payload is a stringified SocketMessage + if send_task_sender.send(msg.get_payload().unwrap()).is_err() { + break; } } + + drop(send_task_sender); }); // ... @@ -211,8 +213,20 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { send_task.abort(); forward_task.abort(); heartbeat_task.abort(); - }, + } }; + + // kill + drop(sender); + drop(db); + + send_task.abort(); + recv_task.abort(); + forward_task.abort(); + heartbeat_task.abort(); + + let _ = close_sender.send("Close".to_string()); + tracing::info!("socket terminate"); } pub async fn create_request(