diff --git a/crates/app/src/routes/api/v1/channels/messages.rs b/crates/app/src/routes/api/v1/channels/messages.rs index 67d75f9..534aafd 100644 --- a/crates/app/src/routes/api/v1/channels/messages.rs +++ b/crates/app/src/routes/api/v1/channels/messages.rs @@ -44,7 +44,7 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { let (sender, receiver) = mpsc::channel::(); // forward messages from mpsc to the sink - tokio::spawn(async move { + let mut forward_task = tokio::spawn(async move { while let Ok(message) = receiver.recv() { if message == "Close" { let _ = sink.close().await; @@ -60,7 +60,7 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { // ping let ping_sender = sender.clone(); - tokio::spawn(async move { + let mut heartbeat_task = tokio::spawn(async move { let mut heartbeat = tokio::time::interval(Duration::from_secs(30)); loop { @@ -189,10 +189,28 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) { // ... let close_sender = sender.clone(); tokio::select! { - _ = (&mut send_task) => recv_task.abort(), + _ = (&mut heartbeat_task) => { + let _ = close_sender.send("Close".to_string()); + forward_task.abort(); + recv_task.abort(); + send_task.abort(); + } + _ = (&mut forward_task) => { + send_task.abort(); + recv_task.abort(); + heartbeat_task.abort(); + } + _ = (&mut send_task) => { + let _ = close_sender.send("Close".to_string()); + forward_task.abort(); + recv_task.abort(); + heartbeat_task.abort(); + }, _ = (&mut recv_task) => { let _ = close_sender.send("Close".to_string()); - send_task.abort() + send_task.abort(); + forward_task.abort(); + heartbeat_task.abort(); }, }; }