-
+ {% if not grouped %}
+
{{ self::full_username(user=user) }} {% if message.edited !=
message.created %}
@@ -930,30 +955,16 @@ message, can_manage_message=false) -%}
- {% if can_manage_message or (user and user.id == message.owner)
- %}
-
-
-
-
-
-
-
- {% endif %}
+ {{ self::message_actions(user=user, message=message,
+ can_manage_message=can_manage_message) }}
+ {% else %}
+
+ {{ self::message_actions(user=user, message=message,
+ can_manage_message=can_manage_message) }}
+
+ {% endif %}
{{ message.content|markdown|safe }}
diff --git a/crates/app/src/routes/api/v1/auth/profile.rs b/crates/app/src/routes/api/v1/auth/profile.rs
index 6fd0bff..e68dc65 100644
--- a/crates/app/src/routes/api/v1/auth/profile.rs
+++ b/crates/app/src/routes/api/v1/auth/profile.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use crate::{
get_user_from_token,
model::{ApiReturn, Error},
@@ -8,19 +10,28 @@ use crate::{
State,
};
use axum::{
- Extension, Json,
- extract::Path,
+ extract::{
+ ws::{Message as WsMessage, WebSocket},
+ Path, WebSocketUpgrade,
+ },
response::{IntoResponse, Redirect},
+ Extension, Json,
};
use axum_extra::extract::CookieJar;
+use futures_util::{sink::SinkExt, stream::StreamExt};
use tetratto_core::{
+ cache::Cache,
model::{
auth::{Token, UserSettings},
permissions::FinePermission,
+ socket::{PacketType, SocketMessage, SocketMethod},
},
DataManager,
};
+#[cfg(feature = "redis")]
+use redis::Commands;
+
pub async fn redirect_from_id(
Extension(data): Extension
,
Path(id): Path,
@@ -410,3 +421,114 @@ pub async fn has_totp_enabled_request(
payload: Some(!user.totp.is_empty()),
})
}
+
+/// Handle a subscription to the websocket.
+#[cfg(feature = "redis")]
+pub async fn subscription_handler(
+ jar: CookieJar,
+ ws: WebSocketUpgrade,
+ Extension(data): Extension,
+ Path((user_id, id)): Path<(String, String)>,
+) -> impl IntoResponse {
+ let data = &(data.read().await).0;
+ let user = match get_user_from_token!(jar, data) {
+ Some(ua) => ua,
+ None => return Err("Socket refused"),
+ };
+
+ if user.id.to_string() != user_id {
+ // TODO: maybe allow moderators to connect anyway
+ return Err("Socket refused (auth)");
+ }
+
+ let data = data.clone();
+ Ok(ws.on_upgrade(|socket| async move {
+ tokio::spawn(async move {
+ handle_socket(socket, data, user_id, id).await;
+ });
+ }))
+}
+
+#[cfg(feature = "redis")]
+pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, stream_id: String) {
+ let (mut sink, mut stream) = socket.split();
+
+ // get channel permissions
+ let channel = format!("{user_id}_{stream_id}");
+
+ // ...
+ let mut recv_task = tokio::spawn(async move {
+ while let Some(Ok(WsMessage::Text(text))) = stream.next().await {
+ if text != "Close" {
+ continue;
+ }
+
+ drop(stream);
+ break;
+ }
+ });
+
+ let dbc = db.clone();
+ let channel_c = channel.clone();
+ let mut redis_task = tokio::spawn(async move {
+ // forward messages from redis to the socket
+ let mut con = dbc.2.get_con().await;
+ let mut pubsub = con.as_pubsub();
+ pubsub.subscribe(channel_c).unwrap();
+
+ // listen for pubsub messages
+ while let Ok(msg) = pubsub.get_message() {
+ // payload is a stringified SocketMessage
+ let smsg = msg.get_payload::().unwrap();
+ let packet: SocketMessage = serde_json::from_str(&smsg).unwrap();
+
+ if packet.method == SocketMethod::Forward(PacketType::Ping) {
+ // forward with custom message
+ if sink.send(WsMessage::Text("Ping".into())).await.is_err() {
+ drop(sink);
+ break;
+ }
+ } else if packet.method == SocketMethod::Message {
+ if sink.send(WsMessage::Text(smsg.into())).await.is_err() {
+ drop(sink);
+ break;
+ }
+ } else {
+ // forward to client
+ if sink.send(WsMessage::Text(smsg.into())).await.is_err() {
+ drop(sink);
+ break;
+ }
+ }
+ }
+ });
+
+ let db2c = db.2.clone();
+ let channel_c = channel.clone();
+ let heartbeat_task = tokio::spawn(async move {
+ let mut con = db2c.get_con().await;
+ let mut heartbeat = tokio::time::interval(Duration::from_secs(10));
+
+ loop {
+ con.publish::(
+ format!("{channel_c}_heartbeat"),
+ serde_json::to_string(&SocketMessage {
+ method: SocketMethod::Forward(PacketType::Ping),
+ data: "Ping".to_string(),
+ })
+ .unwrap(),
+ )
+ .unwrap();
+
+ heartbeat.tick().await;
+ }
+ });
+
+ tokio::select! {
+ _ = (&mut recv_task) => redis_task.abort(),
+ _ = (&mut redis_task) => recv_task.abort()
+ }
+
+ heartbeat_task.abort(); // kill
+ tracing::info!("socket terminate");
+}
diff --git a/crates/app/src/routes/api/v1/channels/messages.rs b/crates/app/src/routes/api/v1/channels/messages.rs
index 7fe47b8..0124e6e 100644
--- a/crates/app/src/routes/api/v1/channels/messages.rs
+++ b/crates/app/src/routes/api/v1/channels/messages.rs
@@ -186,7 +186,7 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, community_id: Str
let dbc = db.clone();
let mut redis_task = tokio::spawn(async move {
- // forward messages from redis to the mpsc
+ // forward messages from redis to the socket
let mut con = dbc.2.get_con().await;
let mut pubsub = con.as_pubsub();
diff --git a/crates/app/src/routes/api/v1/mod.rs b/crates/app/src/routes/api/v1/mod.rs
index 08f011d..5ccbde7 100644
--- a/crates/app/src/routes/api/v1/mod.rs
+++ b/crates/app/src/routes/api/v1/mod.rs
@@ -191,6 +191,10 @@ pub fn routes() -> Router {
get(auth::profile::redirect_from_ip),
)
.route("/auth/ip/{ip}/block", post(auth::social::ip_block_request))
+ .route(
+ "/auth/user/{id}/_connect/{stream}",
+ get(auth::profile::subscription_handler),
+ )
// warnings
.route("/warnings/{id}", post(auth::user_warnings::create_request))
.route(
diff --git a/crates/core/src/database/messages.rs b/crates/core/src/database/messages.rs
index d9f7139..9993524 100644
--- a/crates/core/src/database/messages.rs
+++ b/crates/core/src/database/messages.rs
@@ -46,15 +46,23 @@ impl DataManager {
auto_method!(get_message_by_id(usize as i64)@get_message_from_row -> "SELECT * FROM messages WHERE id = $1" --name="message" --returns=Message --cache-key-tmpl="atto.message:{}");
/// Complete a vector of just messages with their owner as well.
+ ///
+ /// # Returns
+ /// `(message, owner, group with previous messages in ui)`
pub async fn fill_messages(
&self,
messages: Vec,
ignore_users: &[usize],
- ) -> Result> {
- let mut out: Vec<(Message, User)> = Vec::new();
+ ) -> Result> {
+ let mut out: Vec<(Message, User, bool)> = Vec::new();
let mut users: HashMap = HashMap::new();
- for message in messages {
+ for (i, message) in messages.iter().enumerate() {
+ let next_owner: usize = match messages.get(i + 1) {
+ Some(ref m) => m.owner,
+ None => 0,
+ };
+
let owner = message.owner;
if ignore_users.contains(&owner) {
@@ -62,11 +70,11 @@ impl DataManager {
}
if let Some(user) = users.get(&owner) {
- out.push((message, user.clone()));
+ out.push((message.to_owned(), user.clone(), next_owner == owner));
} else {
let user = self.get_user_by_id(owner).await?;
users.insert(owner, user.clone());
- out.push((message, user));
+ out.push((message.to_owned(), user, next_owner == owner));
}
}
diff --git a/crates/core/src/model/channels.rs b/crates/core/src/model/channels.rs
index f5ac557..740a0d6 100644
--- a/crates/core/src/model/channels.rs
+++ b/crates/core/src/model/channels.rs
@@ -68,7 +68,7 @@ impl Channel {
}
}
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
pub struct Message {
pub id: usize,
pub channel: usize,
@@ -98,7 +98,7 @@ impl Message {
}
}
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
pub struct MessageContext;
impl Default for MessageContext {