Skip to content

Commit

Permalink
feat(sdk): Add day dividers to the experimental timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zecakeh committed Dec 1, 2022
1 parent ee713d4 commit 7c60493
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ appservice = ["ruma/appservice-api-s"]
image-proc = ["dep:image"]
image-rayon = ["image-proc", "image?/jpeg_rayon"]

experimental-timeline = ["ruma/unstable-msc2677"]
experimental-timeline = ["ruma/unstable-msc2677", "dep:chrono"]

sliding-sync = [
"matrix-sdk-base/sliding-sync",
Expand All @@ -63,6 +63,7 @@ anymap2 = "0.13.0"
async-stream = "0.3.3"
async-trait = "0.1.53"
bytes = "1.1.0"
chrono = { version = "0.4.23", optional = true }
dashmap = "5.2.0"
derive_builder = { version = "0.11.2", optional = true }
event-listener = "2.5.2"
Expand Down
131 changes: 128 additions & 3 deletions crates/matrix-sdk/src/room/timeline/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{collections::HashMap, sync::Arc};

use chrono::{Datelike, Local, TimeZone};
use futures_signals::signal_vec::MutableVecLockMut;
use indexmap::map::Entry;
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
Expand Down Expand Up @@ -416,9 +417,37 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
let item = Arc::new(TimelineItem::Event(item));
match &self.flow {
Flow::Local { .. } => {
// Use the current time for local events.
let new_ts = MilliSecondsSinceUnixEpoch::now();

// Check if the latest event has the same date as this event.
if let Some(latest_event) = self
.timeline_items
.iter()
.rfind(|item| item.as_event().is_some())
.and_then(|item| item.as_event())
{
if let Some(old_ts) = latest_event.origin_server_ts() {
// If there is no origin_server_ts, it's a local event so we can assume
// it has the same date.
if let Some(day_divider_item) =
maybe_create_day_divider_from_timestamps(old_ts, new_ts)
{
self.timeline_items
.push_cloned(Arc::new(TimelineItem::Virtual(day_divider_item)));
}
}
} else {
// If there is not event item, there is no day divider yet.
let (year, month, day) = timestamp_to_ymd(new_ts);
self.timeline_items.push_cloned(Arc::new(TimelineItem::Virtual(
VirtualTimelineItem::day_divider(year, month, day),
)));
}

self.timeline_items.push_cloned(item);
}
Flow::Remote { txn_id, event_id, position, raw_event, .. } => {
Flow::Remote { txn_id, event_id, position, raw_event, origin_server_ts, .. } => {
if let Some(txn_id) = txn_id {
if let Some((idx, _old_item)) = find_event(self.timeline_items, txn_id) {
// TODO: Check whether anything is different about the
Expand Down Expand Up @@ -450,8 +479,67 @@ impl<'a, 'i> TimelineEventHandler<'a, 'i> {
}

match position {
TimelineItemPosition::Start => self.timeline_items.insert_cloned(0, item),
TimelineItemPosition::End => self.timeline_items.push_cloned(item),
TimelineItemPosition::Start => {
// Check if the earliest day divider has the same date as this event.
if let Some(old_ymd) =
self.timeline_items.get(0).and_then(|item| match item.as_virtual()? {
VirtualTimelineItem::DayDivider { year, month, day } => {
Some((*year, *month, *day))
}
VirtualTimelineItem::ReadMarker => None,
})
{
if let Some(day_divider_item) = maybe_create_day_divider_from_ymd(
old_ymd,
timestamp_to_ymd(*origin_server_ts),
) {
self.timeline_items.insert_cloned(
0,
Arc::new(TimelineItem::Virtual(day_divider_item)),
);
}
} else {
// The list must always start with a day divider.
let (year, month, day) = timestamp_to_ymd(*origin_server_ts);
self.timeline_items.insert_cloned(
0,
Arc::new(TimelineItem::Virtual(VirtualTimelineItem::day_divider(
year, month, day,
))),
);
}

self.timeline_items.insert_cloned(1, item)
}
TimelineItemPosition::End => {
// Check if the latest event has the same date as this event.
if let Some(latest_event) = self
.timeline_items
.iter()
.rfind(|item| item.as_event().is_some())
.and_then(|item| item.as_event())
{
let old_ts = latest_event
.origin_server_ts()
// Default to now for local events.
.unwrap_or_else(MilliSecondsSinceUnixEpoch::now);

if let Some(day_divider_item) =
maybe_create_day_divider_from_timestamps(old_ts, *origin_server_ts)
{
self.timeline_items
.push_cloned(Arc::new(TimelineItem::Virtual(day_divider_item)));
}
} else {
// If there is not event item, there is no day divider yet.
let (year, month, day) = timestamp_to_ymd(*origin_server_ts);
self.timeline_items.push_cloned(Arc::new(TimelineItem::Virtual(
VirtualTimelineItem::day_divider(year, month, day),
)));
}

self.timeline_items.push_cloned(item)
}
#[cfg(feature = "e2e-encryption")]
TimelineItemPosition::Update(idx) => self.timeline_items.set_cloned(*idx, item),
}
Expand Down Expand Up @@ -529,6 +617,43 @@ fn update_timeline_item(
maybe_update_timeline_item(timeline_items, event_id, action, move |item| Some(update(item)))
}

/// Converts a timestamp to a `(year, month, day)` tuple.
fn timestamp_to_ymd(ts: MilliSecondsSinceUnixEpoch) -> (i32, u32, u32) {
let datetime = Local
.timestamp_millis_opt(ts.0.into())
// Only returns `None` if date is after Dec 31, 262143 BCE.
.single()
// Fallback to the current date to avoid issues with malicious
// homeservers.
.unwrap_or_else(Local::now);

(datetime.year(), datetime.month(), datetime.day())
}

/// Returns a new day divider item for the new timestamp if it is on a different
/// day than the old timestamp
fn maybe_create_day_divider_from_timestamps(
old_ts: MilliSecondsSinceUnixEpoch,
new_ts: MilliSecondsSinceUnixEpoch,
) -> Option<VirtualTimelineItem> {
maybe_create_day_divider_from_ymd(timestamp_to_ymd(old_ts), timestamp_to_ymd(new_ts))
}

/// Returns a new day divider item for the new YMD `(year, month, day)` tuple if
/// it is on a different day than the old YMD tuple.
fn maybe_create_day_divider_from_ymd(
old_ymd: (i32, u32, u32),
new_ymd: (i32, u32, u32),
) -> Option<VirtualTimelineItem> {
let (old_year, old_month, old_day) = old_ymd;
let (new_year, new_month, new_day) = new_ymd;
if old_year != new_year || old_month != new_month || old_day != new_day {
Some(VirtualTimelineItem::day_divider(new_year, new_month, new_day))
} else {
None
}
}

struct NewEventTimelineItem {
content: TimelineItemContent,
reactions: BundledReactions,
Expand Down
98 changes: 88 additions & 10 deletions crates/matrix-sdk/src/room/timeline/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn reaction_redaction() {
let mut stream = timeline.stream();

timeline.handle_live_message_event(&ALICE, RoomMessageEventContent::text_plain("hi!")).await;
let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let event = item.as_event().unwrap();
assert_eq!(event.reactions().len(), 0);
Expand All @@ -70,7 +71,7 @@ async fn reaction_redaction() {
let rel = Annotation::new(msg_event_id.to_owned(), "+1".to_owned());
timeline.handle_live_message_event(&BOB, ReactionEventContent::new(rel)).await;
let item =
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 0, value }) => value);
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 1, value }) => value);
let event = item.as_event().unwrap();
assert_eq!(event.reactions().len(), 1);

Expand All @@ -80,7 +81,7 @@ async fn reaction_redaction() {

timeline.handle_live_redaction(&BOB, reaction_event_id).await;
let item =
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 0, value }) => value);
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 1, value }) => value);
let event = item.as_event().unwrap();
assert_eq!(event.reactions().len(), 0);
}
Expand All @@ -91,6 +92,7 @@ async fn invalid_edit() {
let mut stream = timeline.stream();

timeline.handle_live_message_event(&ALICE, RoomMessageEventContent::text_plain("test")).await;
let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let event = item.as_event().unwrap();
let msg = event.content.as_message().unwrap();
Expand All @@ -108,8 +110,8 @@ async fn invalid_edit() {
timeline.handle_live_message_event(&BOB, edit).await;

// Can't easily test the non-arrival of an item using the stream. Instead
// just assert that there is still just a single item in the timeline.
assert_eq!(timeline.inner.items.lock_ref().len(), 1);
// just assert that there is still just a couple items in the timeline.
assert_eq!(timeline.inner.items.lock_ref().len(), 2);
}

#[async_test]
Expand Down Expand Up @@ -137,6 +139,7 @@ async fn edit_redacted() {
},
}))
.await;
let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);

let redacted_event_id = item.as_event().unwrap().event_id().unwrap();
Expand All @@ -149,7 +152,7 @@ async fn edit_redacted() {
});
timeline.handle_live_message_event(&ALICE, edit).await;

assert_eq!(timeline.inner.items.lock_ref().len(), 1);
assert_eq!(timeline.inner.items.lock_ref().len(), 2);
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -202,8 +205,9 @@ async fn unable_to_decrypt() {
)
.await;

assert_eq!(timeline.inner.items.lock_ref().len(), 1);
assert_eq!(timeline.inner.items.lock_ref().len(), 2);

let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let event = item.as_event().unwrap();
let session_id = assert_matches!(
Expand All @@ -230,10 +234,10 @@ async fn unable_to_decrypt() {
)
.await;

assert_eq!(timeline.inner.items.lock_ref().len(), 1);
assert_eq!(timeline.inner.items.lock_ref().len(), 2);

let item =
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 0, value }) => value);
assert_matches!(stream.next().await, Some(VecDiff::UpdateAt { index: 1, value }) => value);
let event = item.as_event().unwrap();
assert_matches!(&event.encryption_info, Some(_));
let text = assert_matches!(event.content(), TimelineItemContent::Message(msg) => msg.body());
Expand All @@ -246,6 +250,7 @@ async fn update_read_marker() {
let mut stream = timeline.stream();

timeline.handle_live_message_event(&ALICE, RoomMessageEventContent::text_plain("A")).await;
let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let event_id = item.as_event().unwrap().event_id().unwrap().to_owned();

Expand All @@ -258,7 +263,7 @@ async fn update_read_marker() {
let event_id = item.as_event().unwrap().event_id().unwrap().to_owned();

timeline.inner.set_fully_read_event(event_id.clone()).await;
assert_matches!(stream.next().await, Some(VecDiff::Move { old_index: 1, new_index: 2 }));
assert_matches!(stream.next().await, Some(VecDiff::Move { old_index: 2, new_index: 3 }));

// Nothing should happen if the fully read event isn't found.
timeline.inner.set_fully_read_event(event_id!("$fake_event_id").to_owned()).await;
Expand All @@ -272,7 +277,7 @@ async fn update_read_marker() {
let event_id = item.as_event().unwrap().event_id().unwrap().to_owned();

timeline.inner.set_fully_read_event(event_id).await;
assert_matches!(stream.next().await, Some(VecDiff::Move { old_index: 2, new_index: 3 }));
assert_matches!(stream.next().await, Some(VecDiff::Move { old_index: 3, new_index: 4 }));
}

#[async_test]
Expand All @@ -292,6 +297,7 @@ async fn invalid_event_content() {
}))
.await;

let _day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let event_item = item.as_event().unwrap();
assert_eq!(event_item.sender(), "@alice:example.org");
Expand Down Expand Up @@ -359,6 +365,78 @@ async fn invalid_event() {
assert_eq!(timeline.inner.items.lock_ref().len(), 0);
}

#[async_test]
async fn day_divider() {
let timeline = TestTimeline::new(&ALICE);
let mut stream = timeline.stream();

timeline
.handle_live_custom_event(json!({
"content": {
"msgtype": "m.text",
"body": "This is a first message on the first day"
},
"event_id": "$eeG0HA0FAZ37wP8kXlNkxx3I",
"origin_server_ts": 1669897395000u64,
"sender": "@alice:example.org",
"type": "m.room.message",
}))
.await;

let day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let (year, month, day) = assert_matches!(
day_divider.as_virtual().unwrap(),
VirtualTimelineItem::DayDivider { year, month, day } => (*year, *month, *day)
);
assert_eq!(year, 2022);
assert_eq!(month, 12);
assert_eq!(day, 1);

let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
item.as_event().unwrap();

timeline
.handle_live_custom_event(json!({
"content": {
"msgtype": "m.text",
"body": "This is a second message on the first day"
},
"event_id": "$feG0HA0FAZ37wP8kXlNkxx3I",
"origin_server_ts": 1669906604000u64,
"sender": "@alice:example.org",
"type": "m.room.message",
}))
.await;

let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
item.as_event().unwrap();

timeline
.handle_live_custom_event(json!({
"content": {
"msgtype": "m.text",
"body": "This is a first message on the next day"
},
"event_id": "$geG0HA0FAZ37wP8kXlNkxx3I",
"origin_server_ts": 1669992963000u64,
"sender": "@alice:example.org",
"type": "m.room.message",
}))
.await;

let day_divider = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
let (year, month, day) = assert_matches!(
day_divider.as_virtual().unwrap(),
VirtualTimelineItem::DayDivider { year, month, day } => (*year, *month, *day)
);
assert_eq!(year, 2022);
assert_eq!(month, 12);
assert_eq!(day, 2);

let item = assert_matches!(stream.next().await, Some(VecDiff::Push { value }) => value);
item.as_event().unwrap();
}

struct TestTimeline {
own_user_id: OwnedUserId,
inner: TimelineInner,
Expand Down
Loading

0 comments on commit 7c60493

Please sign in to comment.