add: socket

This commit is contained in:
trisua 2025-08-24 17:58:39 -04:00
parent 8c86dd6cda
commit c48cf78314
10 changed files with 227 additions and 13 deletions

1
Cargo.lock generated
View file

@ -2923,6 +2923,7 @@ dependencies = [
"axum-image",
"buckets-core",
"dotenv",
"futures-util",
"glob",
"nanoneo",
"oiseau",

View file

@ -33,3 +33,4 @@ regex = "1.11.1"
oiseau = { version = "0.1.2", default-features = false, features = ["postgres", "redis",] }
buckets-core = "1.0.4"
axum-image = "0.1.1"
futures-util = "0.3.31"

View file

@ -140,6 +140,11 @@ nav.sticky {
width: 100%;
}
.container.small {
margin: 0 auto;
max-width: 42ch;
}
.content_container {
margin: 0 auto var(--pad-2);
width: 100%;
@ -327,7 +332,9 @@ video {
}
/* input */
input {
input,
textarea,
select {
--h: 36px;
padding: var(--pad-2) calc(var(--pad-3) * 1.5);
background: var(--color-raised);
@ -341,6 +348,7 @@ input {
height: var(--h);
line-height: var(--h);
border-left: solid 0px transparent;
width: 100%;
}
input:not([type="checkbox"]):focus {
@ -354,7 +362,9 @@ input[data-invalid] {
border-left: inset 5px var(--color-red);
}
input.surface {
input.surface,
textarea.surface,
select.surface {
background: var(--color-surface);
}
@ -563,6 +573,7 @@ img {
--size: 18px;
width: var(--size);
height: var(--size);
border-radius: var(--radius);
aspect-ratio: 1 / 1;
}

View file

@ -0,0 +1,9 @@
(text "{% macro avatar(id, size=\"24px\") -%}")
(img
("title" "User avatar")
("src" "{{ config.service_hosts.buckets }}/avatars/{{ id }}")
("alt" "User avatar")
("class" "avatar shadow")
("loading" "lazy")
("style" "--size: {{ size }}"))
(text "{%- endmacro %}")

View file

@ -3,7 +3,7 @@
(text "Login — {{ name }}"))
(text "{% endblock %} {% block body %}")
(div
("class" "card")
("class" "card container small")
(h4 (text "Login with Tetratto"))
(form
@ -24,7 +24,7 @@
("id" "username")))
(div
("class" "flex flex_col gap_1")
(label ("for" "username") (b (text "Password")))
(label ("for" "password") (b (text "Password")))
(input
("class" "surface")
("type" "password")

View file

@ -1,3 +1,4 @@
(text "{%- import \"components.lisp\" as components -%}")
(text "<!doctype html>")
(html
("lang" "en")
@ -35,8 +36,12 @@
(button
("onclick" "open_dropdown(event)")
("exclude" "dropdown")
("class" "button camo fade")
(text "{{ icon \"menu\" }}"))
("class" "button camo")
(text "{% if user -%}")
(text "{{ components::avatar(id=user.id) }}")
(text "{%- else -%}")
(text "{{ icon \"menu\" }}")
(text "{%- endif %}"))
(div
("class" "inner left")
(a

View file

@ -1,6 +1,10 @@
use super::DataManager;
use crate::model::Message;
use oiseau::{PostgresRow, cache::Cache, execute, get, params};
use crate::model::{Message, SocketMessage, SocketMethod};
use oiseau::{
PostgresRow,
cache::{Cache, redis::Commands},
execute, get, params,
};
use tetratto_core::{
auto_method,
model::{Error, Result, auth::User},
@ -59,6 +63,21 @@ impl DataManager {
return Err(Error::DatabaseError(e.to_string()));
}
// send socket event
let mut sock_con = self.0.1.get_con().await;
if let Err(e) = sock_con.publish::<usize, String, ()>(
data.chat,
SocketMessage {
method: SocketMethod::MessageCreate,
body: serde_json::to_string(&data).unwrap(),
}
.to_string(),
) {
return Err(Error::MiscError(e.to_string()));
}
// ....
Ok(data)
}
@ -94,6 +113,20 @@ impl DataManager {
}
}
// send socket event
let mut sock_con = self.0.1.get_con().await;
if let Err(e) = sock_con.publish::<usize, String, ()>(
message.chat,
SocketMessage {
method: SocketMethod::MessageDelete,
body: message.id.to_string(),
}
.to_string(),
) {
return Err(Error::MiscError(e.to_string()));
}
// ...
Ok(())
}
@ -128,6 +161,20 @@ impl DataManager {
return Err(Error::DatabaseError(e.to_string()));
}
// send socket event
let mut sock_con = self.0.1.get_con().await;
if let Err(e) = sock_con.publish::<usize, String, ()>(
message.chat,
SocketMessage {
method: SocketMethod::MessageUpdate,
body: serde_json::to_string(&(message.id, content)).unwrap(),
}
.to_string(),
) {
return Err(Error::MiscError(e.to_string()));
}
// ...
Ok(())
}

View file

@ -48,7 +48,7 @@ impl Chat {
}
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct Message {
pub id: usize,
pub created: usize,
@ -75,3 +75,29 @@ impl Message {
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq)]
pub enum SocketMethod {
/// A message creation event.
MessageCreate,
/// A message deletion event.
MessageDelete,
/// A message update event.
MessageUpdate,
/// A chat update event.
ChatUpdate,
/// Simple ping.
Ping,
}
#[derive(Serialize, Deserialize)]
pub struct SocketMessage {
pub method: SocketMethod,
pub body: String,
}
impl SocketMessage {
pub fn to_string(&self) -> String {
serde_json::to_string(&self).unwrap()
}
}

View file

@ -1,11 +1,23 @@
use crate::{
State, get_user_from_token,
model::{Chat, ChatStyle, GroupChatInfo},
State,
database::DataManager,
get_user_from_token,
model::{Chat, ChatStyle, GroupChatInfo, SocketMessage, SocketMethod},
};
use axum::{
Extension, Json,
extract::{
Path, WebSocketUpgrade,
ws::{Message as WsMessage, WebSocket},
},
response::IntoResponse,
};
use axum::{Extension, Json, extract::Path, response::IntoResponse};
use axum_extra::extract::CookieJar;
use futures_util::{sink::SinkExt, stream::StreamExt};
use oiseau::cache::{Cache, redis::Commands};
use serde::Deserialize;
use tetratto_core::model::{ApiReturn, Error};
use std::time::Duration;
use tetratto_core::model::{ApiReturn, Error, auth::User};
#[derive(Deserialize)]
pub struct CreateChat {
@ -135,3 +147,104 @@ pub async fn update_info_request(
_ => return Json(Error::DoesNotSupportField("info".to_string()).into()),
}
}
/// Handle a subscription to the websocket.
pub async fn subscription_handler(
jar: CookieJar,
ws: WebSocketUpgrade,
Extension(data): Extension<State>,
Path(id): Path<String>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data.2) {
Some(ua) => ua,
None => return Err(Error::NotAllowed.to_string()),
};
let data = data.clone();
Ok(ws.on_upgrade(|socket| async move {
tokio::spawn(async move {
handle_socket(socket, data, id, user).await;
});
}))
}
pub async fn handle_socket(socket: WebSocket, db: DataManager, chat_id: String, user: User) {
let (mut sink, mut stream) = socket.split();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(WsMessage::Text(text))) = stream.next().await {
if text != "Close" {
continue;
}
// yes, this is an "unclean" disconnection from the socket...
// i don't care, it works
drop(stream);
break;
}
});
let dbc = db.clone();
let chat_id_c = chat_id.clone();
let mut redis_task = tokio::spawn(async move {
// forward messages from redis to the socket
let mut pubsub = dbc.0.1.client.get_async_pubsub().await.unwrap();
pubsub.subscribe(user.id).await.unwrap();
pubsub.subscribe(chat_id_c).await.unwrap();
// listen for pubsub messages
let mut pubsub = pubsub.into_on_message();
while let Some(msg) = pubsub.next().await {
// payload is a stringified SocketMessage
let smsg = msg.get_payload::<String>().unwrap();
let packet: SocketMessage = serde_json::from_str(&smsg).unwrap();
if packet.method == SocketMethod::Ping {
// forward with custom message
if sink.send(WsMessage::Text("Ping".into())).await.is_err() {
drop(sink);
break;
}
} else {
// forward to client
if sink.send(WsMessage::Text(smsg.into())).await.is_err() {
drop(sink);
break;
}
}
}
});
let db2c = db.0.1.clone();
let heartbeat_task = tokio::spawn(async move {
let mut con = db2c.get_con().await;
let mut heartbeat = tokio::time::interval(Duration::from_secs(10));
loop {
con.publish::<usize, String, ()>(
user.id,
SocketMessage {
method: SocketMethod::Ping,
body: "Ping".to_string(),
}
.to_string(),
)
.unwrap();
heartbeat.tick().await;
}
});
tokio::select! {
_ = (&mut recv_task) => redis_task.abort(),
_ = (&mut redis_task) => recv_task.abort()
}
heartbeat_task.abort(); // kill
db.0.1
.decr("atto.active_connections:chats".to_string())
.await;
tracing::info!("socket terminate");
}

View file

@ -18,6 +18,7 @@ pub fn routes() -> Router {
.route("/chats", post(chats::create_request))
.route("/chats/{id}/leave", post(chats::leave_request))
.route("/chats/{id}/info", post(chats::update_info_request))
.route("/chats/{id}/_connect", post(chats::subscription_handler))
// messages
.route("/messages", post(messages::create_request))
.route("/messages/{id}", delete(messages::delete_request))