add: move database drivers to oiseau

This commit is contained in:
trisua 2025-06-08 14:15:42 -04:00
parent 40fce4bc77
commit 81036e3733
57 changed files with 638 additions and 1106 deletions

View file

@ -516,7 +516,9 @@ pub async fn subscription_handler(
pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, stream_id: String) {
let (mut sink, mut stream) = socket.split();
let socket_id = tetratto_shared::hash::salt();
db.2.incr("atto.active_connections:users".to_string()).await;
db.0.1
.incr("atto.active_connections:users".to_string())
.await;
// get channel permissions
let channel = format!("{user_id}/{stream_id}");
@ -551,7 +553,7 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String,
let heartbeat_c = heartbeat_uri.clone();
let mut redis_task = tokio::spawn(async move {
// forward messages from redis to the socket
let mut pubsub = dbc.2.client.get_async_pubsub().await.unwrap();
let mut pubsub = dbc.0.1.client.get_async_pubsub().await.unwrap();
pubsub.subscribe(channel_c).await.unwrap();
pubsub.subscribe(heartbeat_c).await.unwrap();
@ -581,7 +583,7 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String,
}
});
let db2c = db.2.clone();
let db2c = db.0.1.clone();
let heartbeat_c = heartbeat_uri.clone();
let heartbeat_task = tokio::spawn(async move {
let mut con = db2c.get_con().await;
@ -608,7 +610,9 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String,
}
heartbeat_task.abort(); // kill
db.2.decr("atto.active_connections:users".to_string()).await;
db.0.1
.decr("atto.active_connections:users".to_string())
.await;
tracing::info!("socket terminate");
}
@ -628,7 +632,7 @@ pub async fn post_to_socket_request(
return Json(Error::NotAllowed.into());
}
let mut con = data.2.get_con().await;
let mut con = data.0.1.get_con().await;
con.publish::<String, String, ()>(
format!("{user_id}/{id}"),
serde_json::to_string(&msg).unwrap(),