diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs
index 205a7a6..1cc0101 100644
--- a/crates/app/src/main.rs
+++ b/crates/app/src/main.rs
@@ -52,10 +52,9 @@ async fn main() {
let html_path = write_assets(&config).await;
// governor
- // (1000/125) * 32 = 256 requests/second
let governor_config = Arc::new(
GovernorConfigBuilder::default()
- .per_millisecond(125)
+ .per_millisecond(75)
.burst_size(32)
.finish()
.unwrap(),
diff --git a/crates/app/src/public/html/communities/base.html b/crates/app/src/public/html/communities/base.html
index 0c77de6..af180aa 100644
--- a/crates/app/src/public/html/communities/base.html
+++ b/crates/app/src/public/html/communities/base.html
@@ -100,7 +100,7 @@
{% if user %}
-
diff --git a/crates/app/src/public/js/me.js b/crates/app/src/public/js/me.js
index 7814de2..be5fc3b 100644
--- a/crates/app/src/public/js/me.js
+++ b/crates/app/src/public/js/me.js
@@ -240,18 +240,13 @@
streams.subscribe("notifs");
streams.event("notifs", "message", (data) => {
- if (data === "Ping") {
- return;
- }
-
- const json = JSON.parse(data);
- if (!json.method.Packet) {
+ if (!data.method.Packet) {
console.warn("notifications stream cannot read this message");
return;
}
- const inner_data = JSON.parse(json.data);
- if (json.method.Packet.Crud === "Create") {
+ const inner_data = JSON.parse(data.data);
+ if (data.method.Packet.Crud === "Create") {
const current = Number.parseInt(element.innerText || "0");
if (current <= 0) {
@@ -293,7 +288,7 @@
console.info("notification created");
}
- } else if (json.method.Packet.Crud === "Delete") {
+ } else if (data.method.Packet.Crud === "Delete") {
const current = Number.parseInt(element.innerText || "0");
if (current - 1 <= 0) {
diff --git a/crates/app/src/public/js/streams.js b/crates/app/src/public/js/streams.js
index add4dd4..1285d06 100644
--- a/crates/app/src/public/js/streams.js
+++ b/crates/app/src/public/js/streams.js
@@ -12,6 +12,14 @@
return $.STREAMS[stream];
});
+ self.define("get", ({ $ }, id) => {
+ for (const stream of Object.values($.STREAMS)) {
+ if (stream.id === id) {
+ return stream;
+ }
+ }
+ });
+
self.define("subscribe", ({ $ }, stream) => {
if (!$.USER) {
console.warn("cannot subscribe without user id");
@@ -27,6 +35,7 @@
const socket = new WebSocket(endpoint);
$.STREAMS[stream] = {
+ id: null,
socket,
events: {
message: () => {},
@@ -38,10 +47,17 @@
return socket.send("Pong");
}
- return $.sock(stream).events.message(event.data);
+ const data = JSON.parse(event.data);
+
+ if (data.method.Forward === "Key") {
+ $.STREAMS[stream].id = data.data;
+ return console.info(`${stream} ${data.data}`);
+ }
+
+ return $.sock(stream).events.message(data);
});
- return socket;
+ return $.STREAMS[stream];
});
self.define("close", ({ $ }, stream) => {
diff --git a/crates/app/src/routes/api/v1/auth/profile.rs b/crates/app/src/routes/api/v1/auth/profile.rs
index a3a2b7e..40efd9a 100644
--- a/crates/app/src/routes/api/v1/auth/profile.rs
+++ b/crates/app/src/routes/api/v1/auth/profile.rs
@@ -456,6 +456,18 @@ pub async fn handle_socket(socket: WebSocket, db: DataManager, user_id: String,
// get channel permissions
let channel = format!("{user_id}/{stream_id}");
+ // identify socket
+ sink.send(WsMessage::Text(
+ serde_json::to_string(&SocketMessage {
+ method: SocketMethod::Forward(PacketType::Key),
+ data: socket_id.clone(),
+ })
+ .unwrap()
+ .into(),
+ ))
+ .await
+ .unwrap();
+
// ...
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(WsMessage::Text(text))) = stream.next().await {
diff --git a/crates/core/src/model/socket.rs b/crates/core/src/model/socket.rs
index f526aa8..08f8c96 100644
--- a/crates/core/src/model/socket.rs
+++ b/crates/core/src/model/socket.rs
@@ -14,6 +14,8 @@ pub enum PacketType {
Text,
/// A CRUD operation.
Crud(CrudMessageType),
+ /// A text key which identifies the socket.
+ Key,
}
#[derive(Serialize, Deserialize, PartialEq, Eq)]