Skip to content

Commit

Permalink
feat: handle read notifications from other clients
Browse files Browse the repository at this point in the history
The unread channel counters are now updated when a read notification is
recieved from another client.

Note that the counters are ephemeral and will be reset when the app is
restarted.

Marking a channel as read in Gurk does not update the unread counters in
other clients yet.

Related to #286
  • Loading branch information
boxdot committed Sep 16, 2024
1 parent 45f32f6 commit 74ee311
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 6 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl App {
}

pub async fn on_message(&mut self, content: Content) -> anyhow::Result<()> {
tracing::info!(?content, "incoming");
// tracing::info!(?content, "incoming");

#[cfg(feature = "dev")]
if self.config.developer.dump_raw_messages {
Expand All @@ -491,6 +491,10 @@ impl App {

let user_id = self.user_id;

if let ContentBody::SynchronizeMessage(SyncMessage { ref read, .. }) = content.body {
self.handle_read(read);
}

let (channel_idx, message) = match (content.metadata, content.body) {
// Private note message
(
Expand Down Expand Up @@ -1559,7 +1563,7 @@ fn add_emoji_from_sticker(body: &mut Option<String>, sticker: Option<Sticker>) {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;

use crate::config::User;
Expand All @@ -1570,7 +1574,7 @@ mod tests {
use std::cell::RefCell;
use std::rc::Rc;

fn test_app() -> (
pub(crate) fn test_app() -> (
App,
mpsc::UnboundedReceiver<Event>,
Rc<RefCell<Vec<Message>>>,
Expand Down
74 changes: 71 additions & 3 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::BTreeMap;

use anyhow::Context;
use presage::libsignal_service::content::Metadata;
use presage::proto::sync_message::Sent;
use presage::proto::sync_message::{Read, Sent};
use presage::proto::{DataMessage, EditMessage, SyncMessage};
use tracing::debug;

Expand All @@ -19,8 +21,6 @@ impl App {
return Ok(());
};

tracing::info!(?sync_message, "#########");

// edit message
if let Some(Sent {
edit_message:
Expand Down Expand Up @@ -88,6 +88,36 @@ impl App {

Ok(())
}

/// Handles read notifications
pub(crate) fn handle_read(&mut self, read: &[Read]) {
// First collect all the read counters to avoid hitting the storage for the same channel
let read_counters: BTreeMap<ChannelId, u32> = read
.iter()
.filter_map(|read| {
let arrived_at = read.timestamp?;
let channel_id = self.storage.message_channel(arrived_at)?;
let num_unread = self
.storage
.messages(channel_id)
.rev()
.take_while(|msg| arrived_at < msg.arrived_at)
.count();
let num_unread: u32 = num_unread.try_into().ok()?;
Some((channel_id, num_unread))
})
.collect();
// Update the unread counters
for (channel_id, num_unread) in read_counters {
if let Some(channel) = self.storage.channel(channel_id) {
if channel.unread_messages > 0 {
let mut channel = channel.into_owned();
channel.unread_messages = num_unread;
self.storage.store_channel(channel);
}
}
}
}
}

trait MessageExt {
Expand Down Expand Up @@ -122,3 +152,41 @@ impl MessageExt for SyncMessage {
}
}
}

#[cfg(test)]
mod tests {
use crate::app::tests::test_app;

use super::*;

#[test]
#[ignore = "forgetful storage does not support lookup by arrived_at"]
fn test_handle_read() {
let (mut app, _events, _sent_messages) = test_app();

let channel_id = *app.channels.items.first().unwrap();

// new incoming message
let message = app
.storage
.store_message(
channel_id,
Message::text(app.user_id, 42, "unread message".to_string()),
)
.into_owned();

// mark as unread
app.storage
.channel(channel_id)
.unwrap()
.into_owned()
.unread_messages = 1;

app.handle_read(&[Read {
timestamp: Some(message.arrived_at),
..Default::default()
}]);

assert_eq!(app.storage.channel(channel_id).unwrap().unread_messages, 0);
}
}
4 changes: 4 additions & 0 deletions src/storage/forgetful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ impl Storage for ForgetfulStorage {
}

fn save(&mut self) {}

fn message_channel(&self, _arrived_at: u64) -> Option<ChannelId> {
None
}
}
23 changes: 23 additions & 0 deletions src/storage/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ impl Storage for JsonStorage {
error!(error =% e, "failed to save json storage");
}
}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
self.data.channels.items.iter().find_map(|channel| {
channel
.messages
.binary_search_by_key(&arrived_at, |msg| msg.arrived_at)
.is_ok()
.then_some(channel.id)
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -601,4 +611,17 @@ mod tests {
);
assert_eq!(storage.metadata().contacts_sync_request_at, Some(dt));
}

#[test]
fn test_json_storage_message_channel() {
let mut storage = json_storage_from_snapshot();
let channel_id = ChannelId::User(uuid!("966960e0-a8cd-43f1-ac7a-2c986dd470cd"));
let from_id = uuid!("00000000-0000-0000-0000-000000000000");
storage.store_message(
channel_id,
Message::text(from_id, 1664832050004, "hello".to_owned()),
);
assert_eq!(storage.message_channel(1664832050004), Some(channel_id));
assert_eq!(storage.message_channel(0), None);
}
}
5 changes: 5 additions & 0 deletions src/storage/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,9 @@ impl<S: Storage> Storage for MemCache<S> {
fn save(&mut self) {
self.storage.save();
}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
// message arrived_at to channel_id conversion is not cached
self.storage.message_channel(arrived_at)
}
}
2 changes: 2 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub trait Storage {
/// Gets the message by id
fn message(&self, message_id: MessageId) -> Option<Cow<Message>>;

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId>;

fn edits(
&self,
message_id: MessageId,
Expand Down
43 changes: 43 additions & 0 deletions src/storage/sql/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,36 @@ impl Storage for SqliteStorage {
}

fn save(&mut self) {}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
struct SqlChannelId {
channel_id: ChannelId,
}

let arrived_at: i64 = arrived_at
.try_into()
.map_err(|_| MessageConvertError::InvalidTimestamp)
.ok_logged()?;

self.execute(|ctx| {
Box::pin(
sqlx::query_as!(
SqlChannelId,
r#"
SELECT
m.channel_id AS "channel_id: _"
FROM messages AS m
WHERE m.arrived_at = ?
LIMIT 1
"#,
arrived_at
)
.fetch_optional(ctx.conn),
)
})
.ok_logged()?
.map(|channel_id| channel_id.channel_id)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -958,4 +988,17 @@ mod tests {

assert_eq!(is_sqlite_encrypted_heuristics(&url), Some(true));
}

#[test]
fn test_sqlite_storage_message_channel() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut storage = fixtures();
let from_id = uuid!("966960e0-a8cd-43f1-ac7a-2c986dd470cd");
let channel_id = ChannelId::User(uuid!("a955d20f-6b83-4e69-846e-a99b1779ff7a"));
storage.store_message(
channel_id,
Message::text(from_id, 1664832050000, "hello".to_owned()),
);
assert_eq!(storage.message_channel(1664832050000), Some(channel_id));
}
}

0 comments on commit 74ee311

Please sign in to comment.