From c48cf78314443a67ba8fcb86593efafb466f6088 Mon Sep 17 00:00:00 2001 From: trisua Date: Sun, 24 Aug 2025 17:58:39 -0400 Subject: [PATCH] add: socket --- Cargo.lock | 1 + Cargo.toml | 1 + app/public/style.css | 15 +++- app/templates_src/components.lisp | 9 +++ app/templates_src/login.lisp | 4 +- app/templates_src/root.lisp | 9 ++- src/database/messages.rs | 51 ++++++++++++- src/model.rs | 28 ++++++- src/routes/api/chats.rs | 121 +++++++++++++++++++++++++++++- src/routes/api/mod.rs | 1 + 10 files changed, 227 insertions(+), 13 deletions(-) create mode 100644 app/templates_src/components.lisp diff --git a/Cargo.lock b/Cargo.lock index e01f64b..04404b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2923,6 +2923,7 @@ dependencies = [ "axum-image", "buckets-core", "dotenv", + "futures-util", "glob", "nanoneo", "oiseau", diff --git a/Cargo.toml b/Cargo.toml index c98f71f..2738940 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ regex = "1.11.1" oiseau = { version = "0.1.2", default-features = false, features = ["postgres", "redis",] } buckets-core = "1.0.4" axum-image = "0.1.1" +futures-util = "0.3.31" diff --git a/app/public/style.css b/app/public/style.css index 8abc8ce..2c6aa73 100644 --- a/app/public/style.css +++ b/app/public/style.css @@ -140,6 +140,11 @@ nav.sticky { width: 100%; } +.container.small { + margin: 0 auto; + max-width: 42ch; +} + .content_container { margin: 0 auto var(--pad-2); width: 100%; @@ -327,7 +332,9 @@ video { } /* input */ -input { +input, +textarea, +select { --h: 36px; padding: var(--pad-2) calc(var(--pad-3) * 1.5); background: var(--color-raised); @@ -341,6 +348,7 @@ input { height: var(--h); line-height: var(--h); border-left: solid 0px transparent; + width: 100%; } input:not([type="checkbox"]):focus { @@ -354,7 +362,9 @@ input[data-invalid] { border-left: inset 5px var(--color-red); } -input.surface { +input.surface, +textarea.surface, +select.surface { background: var(--color-surface); } @@ -563,6 +573,7 @@ img { --size: 18px; width: var(--size); height: var(--size); + border-radius: var(--radius); aspect-ratio: 1 / 1; } diff --git a/app/templates_src/components.lisp b/app/templates_src/components.lisp new file mode 100644 index 0000000..d8a0f1d --- /dev/null +++ b/app/templates_src/components.lisp @@ -0,0 +1,9 @@ +(text "{% macro avatar(id, size=\"24px\") -%}") +(img + ("title" "User avatar") + ("src" "{{ config.service_hosts.buckets }}/avatars/{{ id }}") + ("alt" "User avatar") + ("class" "avatar shadow") + ("loading" "lazy") + ("style" "--size: {{ size }}")) +(text "{%- endmacro %}") diff --git a/app/templates_src/login.lisp b/app/templates_src/login.lisp index 47e51e8..9881054 100644 --- a/app/templates_src/login.lisp +++ b/app/templates_src/login.lisp @@ -3,7 +3,7 @@ (text "Login — {{ name }}")) (text "{% endblock %} {% block body %}") (div - ("class" "card") + ("class" "card container small") (h4 (text "Login with Tetratto")) (form @@ -24,7 +24,7 @@ ("id" "username"))) (div ("class" "flex flex_col gap_1") - (label ("for" "username") (b (text "Password"))) + (label ("for" "password") (b (text "Password"))) (input ("class" "surface") ("type" "password") diff --git a/app/templates_src/root.lisp b/app/templates_src/root.lisp index 42c42ce..87e742b 100644 --- a/app/templates_src/root.lisp +++ b/app/templates_src/root.lisp @@ -1,3 +1,4 @@ +(text "{%- import \"components.lisp\" as components -%}") (text "") (html ("lang" "en") @@ -35,8 +36,12 @@ (button ("onclick" "open_dropdown(event)") ("exclude" "dropdown") - ("class" "button camo fade") - (text "{{ icon \"menu\" }}")) + ("class" "button camo") + (text "{% if user -%}") + (text "{{ components::avatar(id=user.id) }}") + (text "{%- else -%}") + (text "{{ icon \"menu\" }}") + (text "{%- endif %}")) (div ("class" "inner left") (a diff --git a/src/database/messages.rs b/src/database/messages.rs index 8a38965..d49e145 100644 --- a/src/database/messages.rs +++ b/src/database/messages.rs @@ -1,6 +1,10 @@ use super::DataManager; -use crate::model::Message; -use oiseau::{PostgresRow, cache::Cache, execute, get, params}; +use crate::model::{Message, SocketMessage, SocketMethod}; +use oiseau::{ + PostgresRow, + cache::{Cache, redis::Commands}, + execute, get, params, +}; use tetratto_core::{ auto_method, model::{Error, Result, auth::User}, @@ -59,6 +63,21 @@ impl DataManager { return Err(Error::DatabaseError(e.to_string())); } + // send socket event + let mut sock_con = self.0.1.get_con().await; + + if let Err(e) = sock_con.publish::( + data.chat, + SocketMessage { + method: SocketMethod::MessageCreate, + body: serde_json::to_string(&data).unwrap(), + } + .to_string(), + ) { + return Err(Error::MiscError(e.to_string())); + } + + // .... Ok(data) } @@ -94,6 +113,20 @@ impl DataManager { } } + // send socket event + let mut sock_con = self.0.1.get_con().await; + + if let Err(e) = sock_con.publish::( + message.chat, + SocketMessage { + method: SocketMethod::MessageDelete, + body: message.id.to_string(), + } + .to_string(), + ) { + return Err(Error::MiscError(e.to_string())); + } + // ... Ok(()) } @@ -128,6 +161,20 @@ impl DataManager { return Err(Error::DatabaseError(e.to_string())); } + // send socket event + let mut sock_con = self.0.1.get_con().await; + + if let Err(e) = sock_con.publish::( + message.chat, + SocketMessage { + method: SocketMethod::MessageUpdate, + body: serde_json::to_string(&(message.id, content)).unwrap(), + } + .to_string(), + ) { + return Err(Error::MiscError(e.to_string())); + } + // ... Ok(()) } diff --git a/src/model.rs b/src/model.rs index adfcaa3..f77e64e 100644 --- a/src/model.rs +++ b/src/model.rs @@ -48,7 +48,7 @@ impl Chat { } } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct Message { pub id: usize, pub created: usize, @@ -75,3 +75,29 @@ impl Message { } } } + +#[derive(Serialize, Deserialize, PartialEq, Eq)] +pub enum SocketMethod { + /// A message creation event. + MessageCreate, + /// A message deletion event. + MessageDelete, + /// A message update event. + MessageUpdate, + /// A chat update event. + ChatUpdate, + /// Simple ping. + Ping, +} + +#[derive(Serialize, Deserialize)] +pub struct SocketMessage { + pub method: SocketMethod, + pub body: String, +} + +impl SocketMessage { + pub fn to_string(&self) -> String { + serde_json::to_string(&self).unwrap() + } +} diff --git a/src/routes/api/chats.rs b/src/routes/api/chats.rs index e01dce5..294643b 100644 --- a/src/routes/api/chats.rs +++ b/src/routes/api/chats.rs @@ -1,11 +1,23 @@ use crate::{ - State, get_user_from_token, - model::{Chat, ChatStyle, GroupChatInfo}, + State, + database::DataManager, + get_user_from_token, + model::{Chat, ChatStyle, GroupChatInfo, SocketMessage, SocketMethod}, +}; +use axum::{ + Extension, Json, + extract::{ + Path, WebSocketUpgrade, + ws::{Message as WsMessage, WebSocket}, + }, + response::IntoResponse, }; -use axum::{Extension, Json, extract::Path, response::IntoResponse}; use axum_extra::extract::CookieJar; +use futures_util::{sink::SinkExt, stream::StreamExt}; +use oiseau::cache::{Cache, redis::Commands}; use serde::Deserialize; -use tetratto_core::model::{ApiReturn, Error}; +use std::time::Duration; +use tetratto_core::model::{ApiReturn, Error, auth::User}; #[derive(Deserialize)] pub struct CreateChat { @@ -135,3 +147,104 @@ pub async fn update_info_request( _ => return Json(Error::DoesNotSupportField("info".to_string()).into()), } } + +/// Handle a subscription to the websocket. +pub async fn subscription_handler( + jar: CookieJar, + ws: WebSocketUpgrade, + Extension(data): Extension, + Path(id): Path, +) -> impl IntoResponse { + let data = &(data.read().await).0; + let user = match get_user_from_token!(jar, data.2) { + Some(ua) => ua, + None => return Err(Error::NotAllowed.to_string()), + }; + + let data = data.clone(); + Ok(ws.on_upgrade(|socket| async move { + tokio::spawn(async move { + handle_socket(socket, data, id, user).await; + }); + })) +} + +pub async fn handle_socket(socket: WebSocket, db: DataManager, chat_id: String, user: User) { + let (mut sink, mut stream) = socket.split(); + + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(WsMessage::Text(text))) = stream.next().await { + if text != "Close" { + continue; + } + + // yes, this is an "unclean" disconnection from the socket... + // i don't care, it works + drop(stream); + break; + } + }); + + let dbc = db.clone(); + let chat_id_c = chat_id.clone(); + let mut redis_task = tokio::spawn(async move { + // forward messages from redis to the socket + let mut pubsub = dbc.0.1.client.get_async_pubsub().await.unwrap(); + + pubsub.subscribe(user.id).await.unwrap(); + pubsub.subscribe(chat_id_c).await.unwrap(); + + // listen for pubsub messages + let mut pubsub = pubsub.into_on_message(); + while let Some(msg) = pubsub.next().await { + // payload is a stringified SocketMessage + let smsg = msg.get_payload::().unwrap(); + let packet: SocketMessage = serde_json::from_str(&smsg).unwrap(); + + if packet.method == SocketMethod::Ping { + // forward with custom message + if sink.send(WsMessage::Text("Ping".into())).await.is_err() { + drop(sink); + break; + } + } else { + // forward to client + if sink.send(WsMessage::Text(smsg.into())).await.is_err() { + drop(sink); + break; + } + } + } + }); + + let db2c = db.0.1.clone(); + let heartbeat_task = tokio::spawn(async move { + let mut con = db2c.get_con().await; + let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); + + loop { + con.publish::( + user.id, + SocketMessage { + method: SocketMethod::Ping, + body: "Ping".to_string(), + } + .to_string(), + ) + .unwrap(); + + heartbeat.tick().await; + } + }); + + tokio::select! { + _ = (&mut recv_task) => redis_task.abort(), + _ = (&mut redis_task) => recv_task.abort() + } + + heartbeat_task.abort(); // kill + db.0.1 + .decr("atto.active_connections:chats".to_string()) + .await; + tracing::info!("socket terminate"); +} diff --git a/src/routes/api/mod.rs b/src/routes/api/mod.rs index e4e911f..63b01fd 100644 --- a/src/routes/api/mod.rs +++ b/src/routes/api/mod.rs @@ -18,6 +18,7 @@ pub fn routes() -> Router { .route("/chats", post(chats::create_request)) .route("/chats/{id}/leave", post(chats::leave_request)) .route("/chats/{id}/info", post(chats::update_info_request)) + .route("/chats/{id}/_connect", post(chats::subscription_handler)) // messages .route("/messages", post(messages::create_request)) .route("/messages/{id}", delete(messages::delete_request))