2025-04-27 23:11:37 -04:00
use std ::collections ::HashMap ;
2025-04-25 16:22:38 -04:00
use super ::* ;
use crate ::cache ::Cache ;
use crate ::model ::moderation ::AuditLogEntry ;
2025-04-27 23:11:37 -04:00
use crate ::model ::socket ::{ SocketMessage , SocketMethod } ;
2025-04-25 16:22:38 -04:00
use crate ::model ::{
Error , Result , auth ::User , permissions ::FinePermission ,
communities_permissions ::CommunityPermission , channels ::Message ,
} ;
use crate ::{ auto_method , execute , get , query_row , query_rows , params } ;
2025-04-27 23:11:37 -04:00
use serde ::Serialize ;
#[ derive(Serialize) ]
struct DeleteMessageEvent {
pub id : String ,
}
#[ cfg(feature = " redis " ) ]
use redis ::Commands ;
2025-04-25 16:22:38 -04:00
#[ cfg(feature = " sqlite " ) ]
use rusqlite ::Row ;
use tetratto_shared ::unix_epoch_timestamp ;
#[ cfg(feature = " postgres " ) ]
use tokio_postgres ::Row ;
impl DataManager {
/// Get a [`Message`] from an SQL row.
pub ( crate ) fn get_message_from_row (
#[ cfg(feature = " sqlite " ) ] x : & Row < '_ > ,
#[ cfg(feature = " postgres " ) ] x : & Row ,
) -> Message {
Message {
id : get ! ( x ->0 ( i64 ) ) as usize ,
channel : get ! ( x ->1 ( i64 ) ) as usize ,
owner : get ! ( x ->2 ( i64 ) ) as usize ,
created : get ! ( x ->3 ( i64 ) ) as usize ,
edited : get ! ( x ->4 ( i64 ) ) as usize ,
content : get ! ( x ->5 ( String ) ) ,
context : serde_json ::from_str ( & get! ( x ->6 ( String ) ) ) . unwrap ( ) ,
}
}
2025-04-27 23:11:37 -04:00
auto_method! ( get_message_by_id ( usize as i64 ) @ get_message_from_row -> " SELECT * FROM messages WHERE id = $1 " - - name = " message " - - returns = Message - - cache - key - tmpl = " atto.message:{} " ) ;
/// Complete a vector of just messages with their owner as well.
2025-05-01 16:43:58 -04:00
///
/// # Returns
/// `(message, owner, group with previous messages in ui)`
2025-04-27 23:11:37 -04:00
pub async fn fill_messages (
& self ,
messages : Vec < Message > ,
2025-04-29 16:53:34 -04:00
ignore_users : & [ usize ] ,
2025-05-01 16:43:58 -04:00
) -> Result < Vec < ( Message , User , bool ) > > {
let mut out : Vec < ( Message , User , bool ) > = Vec ::new ( ) ;
2025-04-27 23:11:37 -04:00
let mut users : HashMap < usize , User > = HashMap ::new ( ) ;
2025-05-01 16:43:58 -04:00
for ( i , message ) in messages . iter ( ) . enumerate ( ) {
let next_owner : usize = match messages . get ( i + 1 ) {
Some ( ref m ) = > m . owner ,
None = > 0 ,
} ;
2025-04-27 23:11:37 -04:00
let owner = message . owner ;
if ignore_users . contains ( & owner ) {
continue ;
}
if let Some ( user ) = users . get ( & owner ) {
2025-05-01 16:43:58 -04:00
out . push ( ( message . to_owned ( ) , user . clone ( ) , next_owner = = owner ) ) ;
2025-04-27 23:11:37 -04:00
} else {
let user = self . get_user_by_id ( owner ) . await ? ;
users . insert ( owner , user . clone ( ) ) ;
2025-05-01 16:43:58 -04:00
out . push ( ( message . to_owned ( ) , user , next_owner = = owner ) ) ;
2025-04-27 23:11:37 -04:00
}
}
Ok ( out )
}
2025-04-25 16:22:38 -04:00
/// Get all messages by channel (paginated).
///
/// # Arguments
/// * `channel` - the ID of the community to fetch channels for
/// * `batch` - the limit of items in each page
/// * `page` - the page number
pub async fn get_messages_by_channel (
& self ,
channel : usize ,
batch : usize ,
page : usize ,
) -> Result < Vec < Message > > {
let conn = match self . connect ( ) . await {
Ok ( c ) = > c ,
Err ( e ) = > return Err ( Error ::DatabaseConnection ( e . to_string ( ) ) ) ,
} ;
let res = query_rows! (
& conn ,
2025-04-27 23:11:37 -04:00
" SELECT * FROM messages WHERE channel = $1 ORDER BY created DESC LIMIT $2 OFFSET $3 " ,
2025-04-25 16:22:38 -04:00
& [ & ( channel as i64 ) , & ( batch as i64 ) , & ( ( page * batch ) as i64 ) ] ,
| x | { Self ::get_message_from_row ( x ) }
) ;
if res . is_err ( ) {
return Err ( Error ::GeneralNotFound ( " message " . to_string ( ) ) ) ;
}
Ok ( res . unwrap ( ) )
}
/// Create a new message in the database.
///
/// # Arguments
/// * `data` - a mock [`Message`] object to insert
pub async fn create_message ( & self , data : Message ) -> Result < ( ) > {
2025-04-27 23:11:37 -04:00
if data . content . len ( ) < 2 {
return Err ( Error ::DataTooLong ( " content " . to_string ( ) ) ) ;
}
if data . content . len ( ) > 2048 {
return Err ( Error ::DataTooLong ( " content " . to_string ( ) ) ) ;
}
2025-04-25 16:22:38 -04:00
let user = self . get_user_by_id ( data . owner ) . await ? ;
let channel = self . get_channel_by_id ( data . channel ) . await ? ;
// check user permission in community
let membership = self
. get_membership_by_owner_community ( user . id , channel . community )
. await ? ;
// check user permission to post in channel
2025-04-27 23:11:37 -04:00
if ! channel . check_post ( user . id , Some ( membership . role ) ) {
2025-04-25 16:22:38 -04:00
return Err ( Error ::NotAllowed ) ;
}
// ...
let conn = match self . connect ( ) . await {
Ok ( c ) = > c ,
Err ( e ) = > return Err ( Error ::DatabaseConnection ( e . to_string ( ) ) ) ,
} ;
let res = execute! (
& conn ,
" INSERT INTO messages VALUES ($1, $2, $3, $4, $5, $6, $7) " ,
params! [
& ( data . id as i64 ) ,
& ( data . channel as i64 ) ,
& ( data . owner as i64 ) ,
& ( data . created as i64 ) ,
& ( data . edited as i64 ) ,
& data . content ,
& serde_json ::to_string ( & data . context ) . unwrap ( )
]
) ;
if let Err ( e ) = res {
return Err ( Error ::DatabaseError ( e . to_string ( ) ) ) ;
}
2025-04-27 23:11:37 -04:00
// post event
let mut con = self . 2. get_con ( ) . await ;
if let Err ( e ) = con . publish ::< usize , String , ( ) > (
2025-04-29 16:53:34 -04:00
if channel . community ! = 0 {
// broadcast to community ws
channel . community
} else {
// broadcast to channel ws
channel . id
} ,
2025-04-27 23:11:37 -04:00
serde_json ::to_string ( & SocketMessage {
method : SocketMethod ::Message ,
2025-04-29 16:53:34 -04:00
data : serde_json ::to_string ( & ( data . channel . to_string ( ) , data ) ) . unwrap ( ) ,
2025-04-27 23:11:37 -04:00
} )
. unwrap ( ) ,
) {
return Err ( Error ::MiscError ( e . to_string ( ) ) ) ;
}
// ...
2025-04-25 16:22:38 -04:00
Ok ( ( ) )
}
pub async fn delete_message ( & self , id : usize , user : User ) -> Result < ( ) > {
let message = self . get_message_by_id ( id ) . await ? ;
let channel = self . get_channel_by_id ( message . channel ) . await ? ;
// check user permission in community
let membership = self
. get_membership_by_owner_community ( user . id , channel . community )
. await ? ;
if ! membership . role . check ( CommunityPermission ::MANAGE_MESSAGES )
& & ! user . permissions . check ( FinePermission ::MANAGE_MESSAGES )
{
return Err ( Error ::NotAllowed ) ;
} else if user . permissions . check ( FinePermission ::MANAGE_MESSAGES ) {
self . create_audit_log_entry ( AuditLogEntry ::new (
user . id ,
format! ( " invoked `delete_message` with x value ` {id} ` " ) ,
) )
. await ?
}
// ...
let conn = match self . connect ( ) . await {
Ok ( c ) = > c ,
Err ( e ) = > return Err ( Error ::DatabaseConnection ( e . to_string ( ) ) ) ,
} ;
let res = execute! ( & conn , " DELETE FROM messages WHERE id = $1 " , & [ & ( id as i64 ) ] ) ;
if let Err ( e ) = res {
return Err ( Error ::DatabaseError ( e . to_string ( ) ) ) ;
}
self . 2. remove ( format! ( " atto.message: {} " , id ) ) . await ;
2025-04-27 23:11:37 -04:00
// post event
let mut con = self . 2. get_con ( ) . await ;
if let Err ( e ) = con . publish ::< usize , String , ( ) > (
2025-04-29 16:53:34 -04:00
if channel . community ! = 0 {
// broadcast to community ws
channel . community
} else {
// broadcast to channel ws
channel . id
} ,
2025-04-27 23:11:37 -04:00
serde_json ::to_string ( & SocketMessage {
method : SocketMethod ::Delete ,
data : serde_json ::to_string ( & DeleteMessageEvent { id : id . to_string ( ) } ) . unwrap ( ) ,
} )
. unwrap ( ) ,
) {
return Err ( Error ::MiscError ( e . to_string ( ) ) ) ;
}
// ...
2025-04-25 16:22:38 -04:00
Ok ( ( ) )
}
pub async fn update_message_content ( & self , id : usize , user : User , x : String ) -> Result < ( ) > {
let y = self . get_message_by_id ( id ) . await ? ;
if user . id ! = y . owner {
if ! user . permissions . check ( FinePermission ::MANAGE_MESSAGES ) {
return Err ( Error ::NotAllowed ) ;
} else {
self . create_audit_log_entry ( AuditLogEntry ::new (
user . id ,
format! ( " invoked `update_message_content` with x value ` {id} ` " ) ,
) )
. await ?
}
}
// ...
let conn = match self . connect ( ) . await {
Ok ( c ) = > c ,
Err ( e ) = > return Err ( Error ::DatabaseConnection ( e . to_string ( ) ) ) ,
} ;
let res = execute! (
& conn ,
" UPDATE messages SET content = $1, edited = $2 WHERE id = $2 " ,
params! [ & x , & ( unix_epoch_timestamp ( ) as i64 ) , & ( id as i64 ) ]
) ;
if let Err ( e ) = res {
return Err ( Error ::DatabaseError ( e . to_string ( ) ) ) ;
}
// return
Ok ( ( ) )
}
}