add: channels, messages

This commit is contained in:
trisua 2025-04-27 23:11:37 -04:00
parent 67492cf73f
commit 7774124bd0
40 changed files with 2238 additions and 115 deletions

View file

@ -0,0 +1,204 @@
use axum::{Extension, Json, extract::Path, response::IntoResponse};
use axum_extra::extract::CookieJar;
use tetratto_core::model::{channels::Channel, ApiReturn, Error};
use crate::{
get_user_from_token,
routes::api::v1::{
CreateChannel, CreateGroupChannel, KickMember, UpdateChannelPosition, UpdateChannelTitle,
},
State,
};
pub async fn create_request(
jar: CookieJar,
Extension(data): Extension<State>,
Json(req): Json<CreateChannel>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data
.create_channel(Channel::new(
match req.community.parse::<usize>() {
Ok(c) => c,
Err(e) => return Json(Error::MiscError(e.to_string()).into()),
},
user.id,
0,
req.title,
))
.await
{
Ok(_) => Json(ApiReturn {
ok: true,
message: "Channel created".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}
pub async fn create_group_request(
jar: CookieJar,
Extension(data): Extension<State>,
Json(req): Json<CreateGroupChannel>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
let mut members: Vec<usize> = Vec::new();
for member in req.members {
members.push(match member.parse::<usize>() {
Ok(c) => c,
Err(e) => return Json(Error::MiscError(e.to_string()).into()),
})
}
// check for existing
if members.len() == 1 {
let other_user = members.get(0).unwrap().to_owned();
if let Ok(channel) = data.get_channel_by_owner_member(user.id, other_user).await {
return Json(ApiReturn {
ok: true,
message: "Channel exists".to_string(),
payload: Some(channel.id.to_string()),
});
}
}
// check member permissions
for member in &members {
let other_user = match data.get_user_by_id(member.to_owned()).await {
Ok(ua) => ua,
Err(e) => return Json(e.into()),
};
if other_user.settings.private_chats {
if data
.get_userfollow_by_initiator_receiver(other_user.id, user.id)
.await
.is_err()
{
return Json(Error::NotAllowed.into());
}
}
}
// ...
let mut props = Channel::new(0, user.id, 0, req.title);
props.members = members;
let id = props.id.clone();
match data.create_channel(props).await {
Ok(_) => Json(ApiReturn {
ok: true,
message: "Channel created".to_string(),
payload: Some(id.to_string()),
}),
Err(e) => Json(e.into()),
}
}
pub async fn delete_request(
jar: CookieJar,
Extension(data): Extension<State>,
Path(id): Path<usize>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data.delete_channel(id, &user).await {
Ok(_) => Json(ApiReturn {
ok: true,
message: "Channel deleted".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}
pub async fn update_title_request(
jar: CookieJar,
Extension(data): Extension<State>,
Path(id): Path<usize>,
Json(req): Json<UpdateChannelTitle>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data.update_channel_title(id, user, &req.title).await {
Ok(_) => Json(ApiReturn {
ok: true,
message: "Channel updated".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}
pub async fn update_position_request(
jar: CookieJar,
Extension(data): Extension<State>,
Path(id): Path<usize>,
Json(req): Json<UpdateChannelPosition>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data.update_channel_position(id, user, req.position).await {
Ok(_) => Json(ApiReturn {
ok: true,
message: "Channel updated".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}
pub async fn kick_member_request(
jar: CookieJar,
Extension(data): Extension<State>,
Path(id): Path<usize>,
Json(req): Json<KickMember>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data
.remove_channel_member(
id,
user,
match req.member.parse::<usize>() {
Ok(c) => c,
Err(e) => return Json(Error::MiscError(e.to_string()).into()),
},
)
.await
{
Ok(_) => Json(ApiReturn {
ok: true,
message: "Member removed".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}

View file

@ -0,0 +1,236 @@
use axum::{
extract::{
ws::{Message as WsMessage, WebSocket, WebSocketUpgrade},
Path,
},
response::{IntoResponse, Response},
Extension, Json,
};
use axum_extra::extract::CookieJar;
use tetratto_core::{
cache::Cache,
model::{
auth::User,
channels::Message,
socket::{SocketMessage, SocketMethod},
ApiReturn, Error,
},
};
use std::sync::mpsc;
use crate::{get_user_from_token, routes::api::v1::CreateMessage, State};
use serde::Deserialize;
use futures_util::{sink::SinkExt, stream::StreamExt};
#[derive(Deserialize)]
pub struct SocketHeaders {
pub channel: String,
pub user: String,
}
/// Handle a subscription to the websocket.
pub async fn subscription_handler(
ws: WebSocketUpgrade,
Extension(data): Extension<State>,
Path(channel_id): Path<usize>,
) -> Response {
ws.on_upgrade(move |socket| handle_socket(socket, data, channel_id))
}
pub async fn handle_socket(socket: WebSocket, state: State, channel_id: usize) {
let db = &(state.read().await).0;
let db = db.clone();
let (mut sink, mut stream) = socket.split();
let (sender, receiver) = mpsc::channel::<String>();
// forward messages from mpsc to the sink
tokio::spawn(async move {
while let Ok(message) = receiver.recv() {
if message == "Close" {
sink.close().await.unwrap();
drop(receiver);
break;
}
if sink.send(message.into()).await.is_err() {
break;
}
}
});
// ...
let mut user: Option<User> = None;
let mut con = db.2.clone().get_con().await;
// handle incoming messages on socket
let dbc = db.clone();
let recv_sender = sender.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(WsMessage::Text(text))) = stream.next().await {
if text == "Pong" {
continue;
}
if text == "Close" {
recv_sender.send("Close".to_string()).unwrap();
break;
}
let data: SocketMessage = match serde_json::from_str(&text.to_string()) {
Ok(t) => t,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
};
if data.method != SocketMethod::Headers && user.is_none() {
// we've sent something else before authenticating... that's not right
recv_sender.send("Close".to_string()).unwrap();
break;
}
match data.method {
SocketMethod::Headers => {
let data: SocketHeaders = data.data();
user = Some(
match dbc
.get_user_by_id(match data.user.parse::<usize>() {
Ok(c) => c,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
})
.await
{
Ok(ua) => ua,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
},
);
let channel = match dbc
.get_channel_by_id(match data.channel.parse::<usize>() {
Ok(c) => c,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
})
.await
{
Ok(c) => c,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
};
let user = user.as_ref().unwrap();
let membership = match dbc
.get_membership_by_owner_community(user.id, channel.id)
.await
{
Ok(ua) => ua,
Err(_) => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
};
if !channel.check_read(user.id, Some(membership.role)) {
recv_sender.send("Close".to_string()).unwrap();
break;
}
}
_ => {
recv_sender.send("Close".to_string()).unwrap();
break;
}
}
}
});
// forward messages from redis to the mpsc
let send_task_sender = sender.clone();
let mut send_task = tokio::spawn(async move {
let mut pubsub = con.as_pubsub();
pubsub.subscribe(channel_id).unwrap();
loop {
while let Ok(msg) = pubsub.get_message() {
// payload is a stringified SocketMessage
if send_task_sender.send(msg.get_payload().unwrap()).is_err() {
break;
}
}
}
});
// ...
let close_sender = sender.clone();
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => {
let _ = close_sender.send("Close".to_string());
send_task.abort()
},
};
}
pub async fn create_request(
jar: CookieJar,
Extension(data): Extension<State>,
Json(req): Json<CreateMessage>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data
.create_message(Message::new(
match req.channel.parse::<usize>() {
Ok(c) => c,
Err(e) => return Json(Error::MiscError(e.to_string()).into()),
},
user.id,
req.content,
))
.await
{
Ok(_) => Json(ApiReturn {
ok: true,
message: "Message created".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}
pub async fn delete_request(
jar: CookieJar,
Extension(data): Extension<State>,
Path(id): Path<usize>,
) -> impl IntoResponse {
let data = &(data.read().await).0;
let user = match get_user_from_token!(jar, data) {
Some(ua) => ua,
None => return Json(Error::NotAllowed.into()),
};
match data.delete_message(id, user).await {
Ok(_) => Json(ApiReturn {
ok: true,
message: "Message deleted".to_string(),
payload: (),
}),
Err(e) => Json(e.into()),
}
}

View file

@ -0,0 +1,2 @@
pub mod channels;
pub mod messages;

View file

@ -6,9 +6,12 @@ pub mod reports;
pub mod requests;
pub mod util;
#[cfg(feature = "redis")]
pub mod channels;
use axum::{
routing::{any, delete, get, post},
Router,
routing::{delete, get, post},
};
use serde::Deserialize;
use tetratto_core::model::{
@ -266,6 +269,32 @@ pub fn routes() -> Router {
"/auth/user/connections/last_fm/api_proxy",
post(auth::connections::last_fm::proxy_request),
)
// channels
.route("/channels", post(channels::channels::create_request))
.route(
"/channels/group",
post(channels::channels::create_group_request),
)
.route(
"/channels/{id}/title",
post(channels::channels::update_title_request),
)
.route(
"/channels/{id}/move",
post(channels::channels::update_position_request),
)
.route("/channels/{id}", delete(channels::channels::delete_request))
.route(
"/channels/{id}/kick",
post(channels::channels::kick_member_request),
)
// messages
.route(
"/channels/{id}/ws",
any(channels::messages::subscription_handler),
)
.route("/messages", post(channels::messages::create_request))
.route("/messages/{id}", delete(channels::messages::delete_request))
}
#[derive(Deserialize)]
@ -419,3 +448,36 @@ pub struct CreateQuestion {
#[serde(default)]
pub community: String,
}
#[derive(Deserialize)]
pub struct CreateChannel {
pub title: String,
pub community: String,
}
#[derive(Deserialize)]
pub struct CreateGroupChannel {
pub title: String,
pub members: Vec<String>,
}
#[derive(Deserialize)]
pub struct UpdateChannelTitle {
pub title: String,
}
#[derive(Deserialize)]
pub struct UpdateChannelPosition {
pub position: i32,
}
#[derive(Deserialize)]
pub struct CreateMessage {
pub content: String,
pub channel: String,
}
#[derive(Deserialize)]
pub struct KickMember {
pub member: String,
}