fix: attempt to stop spamming websocket

socket needs major fixes because of weird tokio stuff
This commit is contained in:
trisua 2025-04-28 01:30:03 -04:00
parent 93c4093620
commit 3a12c0ee6c
3 changed files with 41 additions and 21 deletions

View file

@ -34,7 +34,7 @@ fn check_supporter(value: &Value, _: &HashMap<String, Value>) -> tera::Result<Va
.into())
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
#[tokio::main(flavor = "multi_thread")]
async fn main() {
tracing_subscriber::fmt()
.with_target(false)

View file

@ -442,12 +442,6 @@
{% if selected_channel %}
<script>
if (window.socket) {
window.socket.close();
window.socket = undefined;
console.log("closed old");
}
setTimeout(() => {
if (window.socket) {
if (window.socket_id === "{{ selected_channel }}") {
@ -460,6 +454,18 @@
}
}
for (const anchor of document.querySelectorAll("a")) {
if (anchor.href.includes("{{ selected_channel }}")) {
continue;
}
anchor.addEventListener("click", () => {
window.socket.close();
window.socket = undefined;
console.log("force abandon socket");
});
}
const endpoint = `${window.location.origin.replace("http", "ws")}/api/v1/channels/{{ selected_channel }}/ws`;
const socket = new WebSocket(endpoint);
window.socket = socket;

View file

@ -48,7 +48,6 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
while let Ok(message) = receiver.recv() {
if message == "Close" {
let _ = sink.close().await;
drop(receiver);
break;
}
@ -56,20 +55,21 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
break;
}
}
drop(receiver);
drop(sink);
});
// ping
let ping_sender = sender.clone();
let mut heartbeat_task = tokio::spawn(async move {
let mut heartbeat = tokio::time::interval(Duration::from_secs(30));
let mut heartbeat = tokio::time::interval(Duration::from_secs(10));
loop {
while ping_sender.send("Ping".to_string()).is_ok() {
heartbeat.tick().await;
if ping_sender.send("Ping".to_string()).is_err() {
// remote has abandoned us
break;
}
}
drop(ping_sender);
});
// ...
@ -168,6 +168,8 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
}
}
}
drop(recv_sender);
});
// forward messages from redis to the mpsc
@ -176,14 +178,14 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
let mut pubsub = con.as_pubsub();
pubsub.subscribe(channel_id).unwrap();
loop {
while let Ok(msg) = pubsub.get_message() {
// payload is a stringified SocketMessage
if send_task_sender.send(msg.get_payload().unwrap()).is_err() {
break;
}
while let Ok(msg) = pubsub.get_message() {
// payload is a stringified SocketMessage
if send_task_sender.send(msg.get_payload().unwrap()).is_err() {
break;
}
}
drop(send_task_sender);
});
// ...
@ -211,8 +213,20 @@ pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
send_task.abort();
forward_task.abort();
heartbeat_task.abort();
},
}
};
// kill
drop(sender);
drop(db);
send_task.abort();
recv_task.abort();
forward_task.abort();
heartbeat_task.abort();
let _ = close_sender.send("Close".to_string());
tracing::info!("socket terminate");
}
pub async fn create_request(