From 094dd5fdb532116e4b2bdd371b7fe4d109436b34 Mon Sep 17 00:00:00 2001 From: trisua Date: Thu, 1 May 2025 16:43:58 -0400 Subject: [PATCH] add: user socket streams add: group messages by author in ui TODO: group messages by author in ui as they come in from socket TODO: notifications stream connection --- crates/app/src/public/html/chats/app.html | 19 ++- crates/app/src/public/html/chats/stream.html | 2 +- crates/app/src/public/html/components.html | 63 +++++---- crates/app/src/routes/api/v1/auth/profile.rs | 126 +++++++++++++++++- .../src/routes/api/v1/channels/messages.rs | 2 +- crates/app/src/routes/api/v1/mod.rs | 4 + crates/core/src/database/messages.rs | 18 ++- crates/core/src/model/channels.rs | 4 +- 8 files changed, 198 insertions(+), 40 deletions(-) diff --git a/crates/app/src/public/html/chats/app.html b/crates/app/src/public/html/chats/app.html index e9e0f94..58a9dec 100644 --- a/crates/app/src/public/html/chats/app.html +++ b/crates/app/src/public/html/chats/app.html @@ -65,14 +65,14 @@
+ {% if selected_community != 0 %} {{ icon "book-heart" }} {{ text "communities:label.show_community" }} - - {% if can_manage_channels %} + {% endif %} {% if can_manage_channels %} {{ icon "settings" }} {{ text "general:action.manage" }} @@ -104,7 +104,6 @@
0 %} diff --git a/crates/app/src/public/html/components.html b/crates/app/src/public/html/components.html index d737feb..e8d4b98 100644 --- a/crates/app/src/public/html/components.html +++ b/crates/app/src/public/html/components.html @@ -909,15 +909,40 @@ if state and state.data %}
{%- endmacro %} {% macro connection_url(key, value) -%} {% if value[0].data.url %} {{ value[0].data.url }} {% elif key == "LastFm" %} https://last.fm/user/{{ -value[0].data.name }} {% endif %} {%- endmacro %} {% macro message(user, -message, can_manage_message=false) -%} -
+value[0].data.name }} {% endif %} {%- endmacro %} {% macro +message_actions(can_manage_message, user, message) -%} {% if can_manage_message +or (user and user.id == message.owner) %} + +{% endif %} {%- endmacro %} {% macro message(user, message, +can_manage_message=false, grouped=false) -%} +
+ {% if not grouped %} {{ self::avatar(username=user.username, size="52px") }} + {% endif %}
-
+ {% if not grouped %} +
{{ self::full_username(user=user) }} {% if message.edited != message.created %} @@ -930,30 +955,16 @@ message, can_manage_message=false) -%}
+ {% else %} + + {% endif %} {{ message.content|markdown|safe }}
diff --git a/crates/app/src/routes/api/v1/auth/profile.rs b/crates/app/src/routes/api/v1/auth/profile.rs index 6fd0bff..e68dc65 100644 --- a/crates/app/src/routes/api/v1/auth/profile.rs +++ b/crates/app/src/routes/api/v1/auth/profile.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ get_user_from_token, model::{ApiReturn, Error}, @@ -8,19 +10,28 @@ use crate::{ State, }; use axum::{ - Extension, Json, - extract::Path, + extract::{ + ws::{Message as WsMessage, WebSocket}, + Path, WebSocketUpgrade, + }, response::{IntoResponse, Redirect}, + Extension, Json, }; use axum_extra::extract::CookieJar; +use futures_util::{sink::SinkExt, stream::StreamExt}; use tetratto_core::{ + cache::Cache, model::{ auth::{Token, UserSettings}, permissions::FinePermission, + socket::{PacketType, SocketMessage, SocketMethod}, }, DataManager, }; +#[cfg(feature = "redis")] +use redis::Commands; + pub async fn redirect_from_id( Extension(data): Extension, Path(id): Path, @@ -410,3 +421,114 @@ pub async fn has_totp_enabled_request( payload: Some(!user.totp.is_empty()), }) } + +/// Handle a subscription to the websocket. +#[cfg(feature = "redis")] +pub async fn subscription_handler( + jar: CookieJar, + ws: WebSocketUpgrade, + Extension(data): Extension, + Path((user_id, id)): Path<(String, String)>, +) -> impl IntoResponse { + let data = &(data.read().await).0; + let user = match get_user_from_token!(jar, data) { + Some(ua) => ua, + None => return Err("Socket refused"), + }; + + if user.id.to_string() != user_id { + // TODO: maybe allow moderators to connect anyway + return Err("Socket refused (auth)"); + } + + let data = data.clone(); + Ok(ws.on_upgrade(|socket| async move { + tokio::spawn(async move { + handle_socket(socket, data, user_id, id).await; + }); + })) +} + +#[cfg(feature = "redis")] +pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, stream_id: String) { + let (mut sink, mut stream) = socket.split(); + + // get channel permissions + let channel = format!("{user_id}_{stream_id}"); + + // ... + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(WsMessage::Text(text))) = stream.next().await { + if text != "Close" { + continue; + } + + drop(stream); + break; + } + }); + + let dbc = db.clone(); + let channel_c = channel.clone(); + let mut redis_task = tokio::spawn(async move { + // forward messages from redis to the socket + let mut con = dbc.2.get_con().await; + let mut pubsub = con.as_pubsub(); + pubsub.subscribe(channel_c).unwrap(); + + // listen for pubsub messages + while let Ok(msg) = pubsub.get_message() { + // payload is a stringified SocketMessage + let smsg = msg.get_payload::().unwrap(); + let packet: SocketMessage = serde_json::from_str(&smsg).unwrap(); + + if packet.method == SocketMethod::Forward(PacketType::Ping) { + // forward with custom message + if sink.send(WsMessage::Text("Ping".into())).await.is_err() { + drop(sink); + break; + } + } else if packet.method == SocketMethod::Message { + if sink.send(WsMessage::Text(smsg.into())).await.is_err() { + drop(sink); + break; + } + } else { + // forward to client + if sink.send(WsMessage::Text(smsg.into())).await.is_err() { + drop(sink); + break; + } + } + } + }); + + let db2c = db.2.clone(); + let channel_c = channel.clone(); + let heartbeat_task = tokio::spawn(async move { + let mut con = db2c.get_con().await; + let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); + + loop { + con.publish::( + format!("{channel_c}_heartbeat"), + serde_json::to_string(&SocketMessage { + method: SocketMethod::Forward(PacketType::Ping), + data: "Ping".to_string(), + }) + .unwrap(), + ) + .unwrap(); + + heartbeat.tick().await; + } + }); + + tokio::select! { + _ = (&mut recv_task) => redis_task.abort(), + _ = (&mut redis_task) => recv_task.abort() + } + + heartbeat_task.abort(); // kill + tracing::info!("socket terminate"); +} diff --git a/crates/app/src/routes/api/v1/channels/messages.rs b/crates/app/src/routes/api/v1/channels/messages.rs index 7fe47b8..0124e6e 100644 --- a/crates/app/src/routes/api/v1/channels/messages.rs +++ b/crates/app/src/routes/api/v1/channels/messages.rs @@ -186,7 +186,7 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, community_id: Str let dbc = db.clone(); let mut redis_task = tokio::spawn(async move { - // forward messages from redis to the mpsc + // forward messages from redis to the socket let mut con = dbc.2.get_con().await; let mut pubsub = con.as_pubsub(); diff --git a/crates/app/src/routes/api/v1/mod.rs b/crates/app/src/routes/api/v1/mod.rs index 08f011d..5ccbde7 100644 --- a/crates/app/src/routes/api/v1/mod.rs +++ b/crates/app/src/routes/api/v1/mod.rs @@ -191,6 +191,10 @@ pub fn routes() -> Router { get(auth::profile::redirect_from_ip), ) .route("/auth/ip/{ip}/block", post(auth::social::ip_block_request)) + .route( + "/auth/user/{id}/_connect/{stream}", + get(auth::profile::subscription_handler), + ) // warnings .route("/warnings/{id}", post(auth::user_warnings::create_request)) .route( diff --git a/crates/core/src/database/messages.rs b/crates/core/src/database/messages.rs index d9f7139..9993524 100644 --- a/crates/core/src/database/messages.rs +++ b/crates/core/src/database/messages.rs @@ -46,15 +46,23 @@ impl DataManager { auto_method!(get_message_by_id(usize as i64)@get_message_from_row -> "SELECT * FROM messages WHERE id = $1" --name="message" --returns=Message --cache-key-tmpl="atto.message:{}"); /// Complete a vector of just messages with their owner as well. + /// + /// # Returns + /// `(message, owner, group with previous messages in ui)` pub async fn fill_messages( &self, messages: Vec, ignore_users: &[usize], - ) -> Result> { - let mut out: Vec<(Message, User)> = Vec::new(); + ) -> Result> { + let mut out: Vec<(Message, User, bool)> = Vec::new(); let mut users: HashMap = HashMap::new(); - for message in messages { + for (i, message) in messages.iter().enumerate() { + let next_owner: usize = match messages.get(i + 1) { + Some(ref m) => m.owner, + None => 0, + }; + let owner = message.owner; if ignore_users.contains(&owner) { @@ -62,11 +70,11 @@ impl DataManager { } if let Some(user) = users.get(&owner) { - out.push((message, user.clone())); + out.push((message.to_owned(), user.clone(), next_owner == owner)); } else { let user = self.get_user_by_id(owner).await?; users.insert(owner, user.clone()); - out.push((message, user)); + out.push((message.to_owned(), user, next_owner == owner)); } } diff --git a/crates/core/src/model/channels.rs b/crates/core/src/model/channels.rs index f5ac557..740a0d6 100644 --- a/crates/core/src/model/channels.rs +++ b/crates/core/src/model/channels.rs @@ -68,7 +68,7 @@ impl Channel { } } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct Message { pub id: usize, pub channel: usize, @@ -98,7 +98,7 @@ impl Message { } } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct MessageContext; impl Default for MessageContext {