use std::time::Duration; use crate::{ get_user_from_token, model::{ApiReturn, Error}, routes::api::v1::{ AppendAssociations, DeleteUser, DisableTotp, UpdateUserIsVerified, UpdateUserPassword, UpdateUserRole, UpdateUserUsername, }, State, }; use axum::{ extract::{ ws::{Message as WsMessage, WebSocket}, Path, WebSocketUpgrade, }, response::{IntoResponse, Redirect}, Extension, Json, }; use axum_extra::extract::CookieJar; use futures_util::{sink::SinkExt, stream::StreamExt}; use tetratto_core::{ cache::Cache, model::{ auth::{Token, UserSettings}, permissions::FinePermission, socket::{PacketType, SocketMessage, SocketMethod}, }, DataManager, }; #[cfg(feature = "redis")] use tetratto_core::cache::redis::Commands; use tetratto_shared::hash; pub async fn redirect_from_id( Extension(data): Extension, Path(id): Path, ) -> impl IntoResponse { match (data.read().await) .0 .get_user_by_id(match id.parse::() { Ok(id) => id, Err(_) => return Redirect::to("/"), }) .await { Ok(u) => Redirect::to(&format!("/@{}", u.username)), Err(_) => Redirect::to("/"), } } pub async fn redirect_from_ip( jar: CookieJar, Extension(data): Extension, Path(ip): Path, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Redirect::to("/"), }; if !user.permissions.check(FinePermission::MANAGE_BANS) { return Redirect::to("/"); } match data.get_user_by_token(&ip).await { Ok(u) => Redirect::to(&format!("/@{}", u.username)), Err(_) => Redirect::to("/"), } } pub async fn me_request(jar: CookieJar, Extension(data): Extension) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; Json(ApiReturn { ok: true, message: "User exists".to_string(), payload: Some(user), }) } /// Update the settings of the given user. pub async fn update_user_settings_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(mut req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } // check lengths if req.display_name.len() > 32 { return Json(Error::DataTooLong("display name".to_string()).into()); } if req.warning.len() > 2048 { return Json(Error::DataTooLong("warning".to_string()).into()); } if req.status.len() > 256 { return Json(Error::DataTooLong("status".to_string()).into()); } if req.biography.len() > 4096 { return Json(Error::DataTooLong("warning".to_string()).into()); } // check percentage themes if !req.theme_sat.is_empty() && !req.theme_sat.ends_with("%") { req.theme_sat = format!("{}%", req.theme_sat) } if !req.theme_lit.is_empty() && !req.theme_lit.ends_with("%") { req.theme_lit = format!("{}%", req.theme_lit) } // ... match data.update_user_settings(id, req).await { Ok(_) => Json(ApiReturn { ok: true, message: "Settings updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Append associations to the current user. pub async fn append_associations_request( jar: CookieJar, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let mut user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; // check existing associations to remove associations to deleted users // the user should take care of cleaning their ui themselves for (idx, id) in user.associated.clone().iter().enumerate() { if data.get_user_by_id(id.to_owned()).await.is_err() { user.associated.remove(idx); } } // resolve tokens for token in req.tokens { let hashed = hash::hash(token); let user_from_token = match data.get_user_by_token(&hashed).await { Ok(ua) => ua, Err(_) => continue, }; if user.associated.contains(&user_from_token.id) { // we already know about this; skip continue; } user.associated.push(user_from_token.id); } // ... match data.update_user_associated(user.id, user.associated).await { Ok(_) => Json(ApiReturn { ok: true, message: "Associations updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Update the password of the given user. pub async fn update_user_password_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; let can_force = user.permissions.check(FinePermission::MANAGE_USERS); if user.id != id && !can_force { return Json(Error::NotAllowed.into()); } match data .update_user_password(id, req.from, req.to, user, can_force) .await { Ok(_) => Json(ApiReturn { ok: true, message: "Password updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } pub async fn update_user_username_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } if data.get_user_by_username(&req.to).await.is_ok() { return Json(Error::UsernameInUse.into()); } match data.update_user_username(id, req.to, user).await { Ok(_) => Json(ApiReturn { ok: true, message: "Username updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Update the tokens of the given user. pub async fn update_user_tokens_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json>, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } match data.update_user_tokens(id, req).await { Ok(_) => Json(ApiReturn { ok: true, message: "Tokens updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Update the verification status of the given user. pub async fn update_user_is_verified_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; match data .update_user_verified_status(id, req.is_verified, user) .await { Ok(_) => Json(ApiReturn { ok: true, message: "Verified status updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Update the role of the given user. pub async fn update_user_role_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; match data.update_user_role(id, req.role, user, false).await { Ok(_) => Json(ApiReturn { ok: true, message: "User updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Update the current user's last seen value. pub async fn seen_request(jar: CookieJar, Extension(data): Extension) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; match data.seen_user(&user).await { Ok(_) => Json(ApiReturn { ok: true, message: "User updated".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Delete the given user. pub async fn delete_user_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } match data .delete_user(id, &req.password, user.permissions.check_manager()) .await { Ok(_) => Json(ApiReturn { ok: true, message: "User deleted".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Enable TOTP for a user. pub async fn enable_totp_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; match data.enable_totp(id, user).await { Ok(x) => Json(ApiReturn { ok: true, message: "TOTP enabled".to_string(), payload: Some(x), }), Err(e) => Json(e.into()), } } /// Disable TOTP for a user. pub async fn disable_totp_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } // check totp code let other_user = match data.get_user_by_id(id).await { Ok(u) => u, Err(e) => return Json(e.into()), }; if !data.check_totp(&other_user, &req.totp) { return Json(Error::NotAllowed.into()); } // ... match data.update_user_totp(id, "", &Vec::new()).await { Ok(()) => Json(ApiReturn { ok: true, message: "TOTP disabled".to_string(), payload: (), }), Err(e) => Json(e.into()), } } /// Refresh TOTP recovery codes for a user. pub async fn refresh_totp_codes_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, Json(req): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id != id && !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } // check totp code let other_user = match data.get_user_by_id(id).await { Ok(u) => u, Err(e) => return Json(e.into()), }; if !data.check_totp(&other_user, &req.totp) { return Json(Error::NotAllowed.into()); } // ... let recovery_codes = DataManager::generate_totp_recovery_codes(); match data.update_user_totp(id, &user.totp, &recovery_codes).await { Ok(()) => Json(ApiReturn { ok: true, message: "Recovery codes refreshed".to_string(), payload: Some(recovery_codes), }), Err(e) => Json(e.into()), } } /// Check if the given user has TOTP enabled. pub async fn has_totp_enabled_request( Path(username): Path, Extension(data): Extension, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match data.get_user_by_username(&username).await { Ok(u) => u, Err(e) => return Json(e.into()), }; Json(ApiReturn { ok: true, message: "User exists".to_string(), payload: Some(!user.totp.is_empty()), }) } /// Handle a subscription to the websocket. #[cfg(feature = "redis")] pub async fn subscription_handler( jar: CookieJar, ws: WebSocketUpgrade, Extension(data): Extension, Path((user_id, id)): Path<(String, String)>, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Err("Socket refused"), }; if user.id.to_string() != user_id { // TODO: maybe allow moderators to connect anyway return Err("Socket refused (auth)"); } let data = data.clone(); Ok(ws.on_upgrade(|socket| async move { handle_socket(socket, data, user_id, id).await; })) } #[cfg(feature = "redis")] pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, stream_id: String) { let (mut sink, mut stream) = socket.split(); let socket_id = tetratto_shared::hash::salt(); db.0.1 .incr("atto.active_connections:users".to_string()) .await; // get channel permissions let channel = format!("{user_id}/{stream_id}"); // identify socket sink.send(WsMessage::Text( serde_json::to_string(&SocketMessage { method: SocketMethod::Forward(PacketType::Key), data: socket_id.clone(), }) .unwrap() .into(), )) .await .unwrap(); // ... let mut recv_task = tokio::spawn(async move { while let Some(Ok(WsMessage::Text(text))) = stream.next().await { if text != "Close" { continue; } break; } }); let heartbeat_uri = format!("{channel}/{socket_id}"); let dbc = db.clone(); let channel_c = channel.clone(); let heartbeat_c = heartbeat_uri.clone(); let mut redis_task = tokio::spawn(async move { // forward messages from redis to the socket let mut pubsub = dbc.0.1.client.get_async_pubsub().await.unwrap(); pubsub.subscribe(channel_c).await.unwrap(); pubsub.subscribe(heartbeat_c).await.unwrap(); // listen for pubsub messages let mut pubsub = pubsub.into_on_message(); while let Some(msg) = pubsub.next().await { // payload is a stringified SocketMessage let smsg = msg.get_payload::().unwrap(); let packet: SocketMessage = serde_json::from_str(&smsg).unwrap(); if packet.method == SocketMethod::Forward(PacketType::Ping) { // forward with custom message if sink.send(WsMessage::Text("Ping".into())).await.is_err() { break; } } else if packet.method == SocketMethod::Message { if sink.send(WsMessage::Text(smsg.into())).await.is_err() { break; } } else { // forward to client if sink.send(WsMessage::Text(smsg.into())).await.is_err() { break; } } } }); let db2c = db.0.1.clone(); let heartbeat_c = heartbeat_uri.clone(); let heartbeat_task = tokio::spawn(async move { let mut con = db2c.get_con().await; let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); loop { con.publish::<&str, String, ()>( &heartbeat_c, serde_json::to_string(&SocketMessage { method: SocketMethod::Forward(PacketType::Ping), data: "Ping".to_string(), }) .unwrap(), ) .unwrap(); heartbeat.tick().await; } }); tokio::select! { _ = (&mut recv_task) => redis_task.abort(), _ = (&mut redis_task) => recv_task.abort() } heartbeat_task.abort(); // kill db.0.1 .decr("atto.active_connections:users".to_string()) .await; tracing::info!("socket terminate"); } pub async fn post_to_socket_request( jar: CookieJar, Extension(data): Extension, Path((user_id, id)): Path<(String, String)>, Json(msg): Json, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if user.id.to_string() != user_id { return Json(Error::NotAllowed.into()); } let mut con = data.0.1.get_con().await; con.publish::( format!("{user_id}/{id}"), serde_json::to_string(&msg).unwrap(), ) .unwrap(); Json(ApiReturn { ok: true, message: "Data sent to socket".to_string(), payload: (), }) } /// Calculate the user's great post average. pub async fn get_user_gpa_request( jar: CookieJar, Path(id): Path, Extension(data): Extension, ) -> impl IntoResponse { let data = &(data.read().await).0; let user = match get_user_from_token!(jar, data) { Some(ua) => ua, None => return Json(Error::NotAllowed.into()), }; if !user.permissions.check(FinePermission::MANAGE_USERS) { return Json(Error::NotAllowed.into()); } let gpa = data.calculate_user_gpa(id).await; return Json(ApiReturn { ok: true, message: if gpa >= 3.0 { "cool".to_string() } else if gpa >= 4.0 { "extraordinary".to_string() } else { "ok".to_string() }, payload: Some(gpa), }); }