diff --git a/Cargo.lock b/Cargo.lock index 4711ecc..cdaeba2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -627,7 +627,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -815,6 +819,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.7.0" @@ -1014,6 +1032,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "forwarded-header-value" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" +dependencies = [ + "nonempty", + "thiserror 1.0.69", +] + [[package]] name = "futf" version = "0.1.5" @@ -1069,6 +1097,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1114,9 +1148,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasi 0.14.2+wasi-0.2.4", + "wasm-bindgen", ] [[package]] @@ -1159,6 +1195,29 @@ dependencies = [ "walkdir", ] +[[package]] +name = "governor" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be93b4ec2e4710b04d9264c0c7350cdd62a8c20e5e4ac732552ebb8f0debe8eb" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "getrandom 0.3.2", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.9.1", + "smallvec", + "spinning_top", + "web-time", +] + [[package]] name = "h2" version = "0.4.8" @@ -1188,6 +1247,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1203,7 +1268,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1604,7 +1669,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1915,6 +1980,12 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -1925,6 +1996,18 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "noop_proc_macro" version = "0.3.0" @@ -2234,6 +2317,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2278,6 +2381,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + [[package]] name = "postgres-protocol" version = "0.6.8" @@ -2392,6 +2501,21 @@ dependencies = [ "qrcodegen", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "2.0.1" @@ -2531,6 +2655,15 @@ dependencies = [ "rgb", ] +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags 2.9.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2558,13 +2691,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "438a4e5f8e9aa246d6f3666d6978441bf1b37d5f417b50c4dd220be09f5fcc17" dependencies = [ "arc-swap", + "bytes", + "cfg-if", "combine", + "futures-util", "itoa", "num-bigint", "percent-encoding", + "pin-project-lite", "ryu", "sha1_smol", "socket2", + "tokio", + "tokio-util", "url", ] @@ -3027,6 +3166,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3255,6 +3403,7 @@ dependencies = [ "tetratto-shared", "tokio", "tower-http", + "tower_governor", "tracing", "tracing-subscriber", ] @@ -3633,6 +3782,22 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower_governor" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e6672c7510df74859726427edea641674dad1aeeb30057b87335b1ba23b843" +dependencies = [ + "axum", + "forwarded-header-value", + "governor", + "http", + "pin-project", + "thiserror 2.0.12", + "tower", + "tracing", +] + [[package]] name = "tracing" version = "0.1.41" @@ -4051,6 +4216,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web_atoms" version = "0.1.0" diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 8bdbeda..689fb17 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -15,7 +15,7 @@ serde = { version = "1.0.219", features = ["derive"] } tera = "1.20.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -tower-http = { version = "0.6.2", features = ["trace", "fs"] } +tower-http = { version = "0.6.2", features = ["trace", "fs", "catch-panic"] } axum = { version = "0.8.3", features = ["macros", "ws"] } tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } axum-extra = { version = "0.10.1", features = ["cookie", "multipart"] } @@ -35,4 +35,5 @@ cf-turnstile = "0.2.0" contrasted = "0.1.2" futures-util = "0.3.31" -redis = { version = "0.30.0", optional = true } +redis = { version = "0.30.0", features = ["aio", "tokio-comp"], optional = true } +tower_governor = "0.7.0" diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index e94f0a5..205a7a6 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -11,10 +11,14 @@ pub use tetratto_core::*; use axum::{Extension, Router}; use reqwest::Client; use tera::{Tera, Value}; -use tower_http::trace::{self, TraceLayer}; +use tower_http::{ + trace::{self, TraceLayer}, + catch_panic::CatchPanicLayer, +}; +use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder}; use tracing::{Level, info}; -use std::{collections::HashMap, env::var, process::exit, sync::Arc}; +use std::{collections::HashMap, env::var, net::SocketAddr, process::exit, sync::Arc, time::Duration}; use tokio::sync::RwLock; pub(crate) type State = Arc>; @@ -47,6 +51,27 @@ async fn main() { init_dirs(&config).await; let html_path = write_assets(&config).await; + // governor + // (1000/125) * 32 = 256 requests/second + let governor_config = Arc::new( + GovernorConfigBuilder::default() + .per_millisecond(125) + .burst_size(32) + .finish() + .unwrap(), + ); + + let governor_limiter = governor_config.limiter().clone(); + let governor_interval = Duration::from_secs(60); + + std::thread::spawn(move || { + loop { + std::thread::sleep(governor_interval); + tracing::info!("rate limiting storage size: {}", governor_limiter.len()); + governor_limiter.retain_recent(); + } + }); + // ... let database = DataManager::new(config.clone()).await.unwrap(); database.init().await.unwrap(); @@ -78,7 +103,11 @@ async fn main() { TraceLayer::new_for_http() .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), - ); + ) + .layer(CatchPanicLayer::new()) + .layer(GovernorLayer { + config: governor_config, + }); let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port)) .await @@ -86,5 +115,10 @@ async fn main() { info!("🐇 tetratto."); info!("listening on http://0.0.0.0:{}", config.port); - axum::serve(listener, app).await.unwrap(); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .unwrap(); } diff --git a/crates/app/src/routes/api/v1/auth/profile.rs b/crates/app/src/routes/api/v1/auth/profile.rs index 1a2c5f0..a3a2b7e 100644 --- a/crates/app/src/routes/api/v1/auth/profile.rs +++ b/crates/app/src/routes/api/v1/auth/profile.rs @@ -443,15 +443,15 @@ pub async fn subscription_handler( let data = data.clone(); Ok(ws.on_upgrade(|socket| async move { - tokio::spawn(async move { - handle_socket(socket, data, user_id, id).await; - }); + handle_socket(socket, data, user_id, id).await; })) } #[cfg(feature = "redis")] pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, stream_id: String) { let (mut sink, mut stream) = socket.split(); + let socket_id = tetratto_shared::hash::salt(); + db.2.incr("atto.active_connections:users".to_string()).await; // get channel permissions let channel = format!("{user_id}/{stream_id}"); @@ -463,21 +463,25 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, continue; } - drop(stream); break; } }); + let heartbeat_uri = format!("{channel}/{socket_id}"); + let dbc = db.clone(); let channel_c = channel.clone(); + let heartbeat_c = heartbeat_uri.clone(); let mut redis_task = tokio::spawn(async move { // forward messages from redis to the socket - let mut con = dbc.2.get_con().await; - let mut pubsub = con.as_pubsub(); - pubsub.subscribe(channel_c).unwrap(); + let mut pubsub = dbc.2.client.get_async_pubsub().await.unwrap(); + + pubsub.subscribe(channel_c).await.unwrap(); + pubsub.subscribe(heartbeat_c).await.unwrap(); // listen for pubsub messages - while let Ok(msg) = pubsub.get_message() { + let mut pubsub = pubsub.into_on_message(); + while let Some(msg) = pubsub.next().await { // payload is a stringified SocketMessage let smsg = msg.get_payload::().unwrap(); let packet: SocketMessage = serde_json::from_str(&smsg).unwrap(); @@ -485,18 +489,15 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, if packet.method == SocketMethod::Forward(PacketType::Ping) { // forward with custom message if sink.send(WsMessage::Text("Ping".into())).await.is_err() { - drop(sink); break; } } else if packet.method == SocketMethod::Message { if sink.send(WsMessage::Text(smsg.into())).await.is_err() { - drop(sink); break; } } else { // forward to client if sink.send(WsMessage::Text(smsg.into())).await.is_err() { - drop(sink); break; } } @@ -504,14 +505,14 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, }); let db2c = db.2.clone(); - let channel_c = channel.clone(); + let heartbeat_c = heartbeat_uri.clone(); let heartbeat_task = tokio::spawn(async move { let mut con = db2c.get_con().await; - let mut heartbeat = tokio::time::interval(Duration::from_secs(30)); + let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); loop { con.publish::<&str, String, ()>( - &channel_c, + &heartbeat_c, serde_json::to_string(&SocketMessage { method: SocketMethod::Forward(PacketType::Ping), data: "Ping".to_string(), @@ -524,7 +525,6 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String, } }); - db.2.incr("atto.active_connections:users".to_string()).await; tokio::select! { _ = (&mut recv_task) => redis_task.abort(), _ = (&mut redis_task) => recv_task.abort() diff --git a/crates/app/src/routes/api/v1/channels/messages.rs b/crates/app/src/routes/api/v1/channels/messages.rs index b7b9835..9f2ffe8 100644 --- a/crates/app/src/routes/api/v1/channels/messages.rs +++ b/crates/app/src/routes/api/v1/channels/messages.rs @@ -187,14 +187,14 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, community_id: Str let dbc = db.clone(); let mut redis_task = tokio::spawn(async move { // forward messages from redis to the socket - let mut con = dbc.2.get_con().await; - let mut pubsub = con.as_pubsub(); + let mut pubsub = dbc.2.client.get_async_pubsub().await.unwrap(); - pubsub.subscribe(user.id).unwrap(); - pubsub.subscribe(community_id.clone()).unwrap(); + pubsub.subscribe(user.id).await.unwrap(); + pubsub.subscribe(community_id.clone()).await.unwrap(); // listen for pubsub messages - while let Ok(msg) = pubsub.get_message() { + let mut pubsub = pubsub.into_on_message(); + while let Some(msg) = pubsub.next().await { // payload is a stringified SocketMessage let smsg = msg.get_payload::().unwrap(); let packet: SocketMessage = serde_json::from_str(&smsg).unwrap(); @@ -241,7 +241,7 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, community_id: Str let db2c = db.2.clone(); let heartbeat_task = tokio::spawn(async move { let mut con = db2c.get_con().await; - let mut heartbeat = tokio::time::interval(Duration::from_secs(30)); + let mut heartbeat = tokio::time::interval(Duration::from_secs(10)); loop { con.publish::( diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a190074..c8f170e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -23,7 +23,7 @@ async-recursion = "1.1.1" md-5 = "0.10.6" base16ct = { version = "0.2.0", features = ["alloc"] } -redis = { version = "0.30.0", optional = true } +redis = { version = "0.30.0", features = ["aio", "tokio-comp"], optional = true } rusqlite = { version = "0.35.0", optional = true }