Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSC2716 federation changes #175

Merged
merged 12 commits into from
Sep 8, 2021
105 changes: 76 additions & 29 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ func TestBackfillingHistory(t *testing.T) {
})

t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) {
t.Skip("Skipping until federation is implemented")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing as matrix-org/synapse#10245 is now merged

t.Parallel()

roomID := as.CreateRoom(t, createRoomOpts)
Expand Down Expand Up @@ -392,7 +391,6 @@ func TestBackfillingHistory(t *testing.T) {
})

t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) {
t.Skip("Skipping until federation is implemented")
kegsay marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

roomID := as.CreateRoom(t, createRoomOpts)
Expand Down Expand Up @@ -465,7 +463,7 @@ func TestBackfillingHistory(t *testing.T) {
})

t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) {
t.Skip("Skipping until federation is implemented")
kegsay marked this conversation as resolved.
Show resolved Hide resolved
//t.Skip("Skipping until federation is implemented")
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

roomID := as.CreateRoom(t, createRoomOpts)
Expand All @@ -484,7 +482,7 @@ func TestBackfillingHistory(t *testing.T) {
// Mimic scrollback just through the latest messages
remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
// Limited so we can only see a few of the latest messages
// Limited so we can only see a portion of the latest messages
"limit": []string{"5"},
}))

Expand All @@ -506,7 +504,7 @@ func TestBackfillingHistory(t *testing.T) {

// [1 insertion event + 2 historical events + 1 chunk event + 1 insertion event]
if len(historicalEventIDs) != 5 {
t.Fatalf("Expected eventID list should be length 15 but saw %d: %s", len(historicalEventIDs), historicalEventIDs)
t.Fatalf("Expected eventID list should be length 5 but saw %d: %s", len(historicalEventIDs), historicalEventIDs)
}

beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
Expand Down Expand Up @@ -536,30 +534,16 @@ func TestBackfillingHistory(t *testing.T) {
},
})

// Send a marker event to let all of the homeservers know about the
// insertion point where all of the historical messages are at
markerEvent := b.Event{
Type: markerEventType,
Content: map[string]interface{}{
markerInsertionContentField: baseInsertionEventID,
},
}
// We can't use as.SendEventSynced(...) because application services can't use the /sync API
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, "txn-m123"}, client.WithJSONBody(t, markerEvent.Content))
markerSendBody := client.ParseJSON(t, markerSendRes)
markerEventID := client.GetJSONFieldStr(t, markerSendBody, "event_id")

// Make sure the marker event has reached the remote homeserver
remoteCharlie.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == markerEventID
})
// Send the marker event
sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID)
kegsay marked this conversation as resolved.
Show resolved Hide resolved

messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
}))

must.MatchResponse(t, messagesRes, match.HTTPResponse{
// Make sure all of the historical messages are visible when we scrollback again
must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} {
return r.Get("event_id").Str
Expand All @@ -569,7 +553,7 @@ func TestBackfillingHistory(t *testing.T) {
})

t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) {
t.Skip("Skipping until federation is implemented")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing as matrix-org/synapse#10498 is now merged

//t.Skip("Skipping until federation is implemented")
t.Parallel()

roomID := as.CreateRoom(t, createRoomOpts)
Expand Down Expand Up @@ -607,15 +591,48 @@ func TestBackfillingHistory(t *testing.T) {
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody)
baseInsertionEventID := historicalEventIDs[len(historicalEventIDs)-1]

// TODO: Send marker event
// [1 insertion event + 2 historical events + 1 chunk event + 1 insertion event]
if len(historicalEventIDs) != 5 {
t.Fatalf("Expected eventID list should be length 5 but saw %d: %s", len(historicalEventIDs), historicalEventIDs)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
}

messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
}))
beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes)
eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse(t, beforeMarkerMesssageResBody)
// Since the original body can only be read once, create a new one from the body bytes we just read
beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody))
// Make sure the history isn't visible before we expect it to be there.
// This is to avoid some bug in the homeserver using some unknown
// mechanism to distribute the historical messages to other homeservers.
must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONArrayEach("chunk", func(r gjson.Result) error {
// Throw if we find one of the historical events in the message response
for _, historicalEventID := range historicalEventIDs {
if r.Get("event_id").Str == historicalEventID {
return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs)
}
}
return nil
}),
},
})

// Send the marker event
sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID)

remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
}))

must.MatchResponse(t, messagesRes, match.HTTPResponse{
// Make sure all of the historical messages are visible when we scrollback again
must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} {
return r.Get("event_id").Str
Expand Down Expand Up @@ -649,7 +666,7 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string,
checkCounter := 0
for {
if time.Since(start) > c.SyncUntilTimeout {
t.Fatalf("fetchMessagesUntilResponseHas timed out. Called check function %d times", checkCounter)
t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter)
kegsay marked this conversation as resolved.
Show resolved Hide resolved
}

messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
Expand Down Expand Up @@ -734,6 +751,36 @@ func ensureVirtualUserRegistered(t *testing.T, c *client.CSAPI, virtualUserLocal
}
}

func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSAPI, roomID, insertionEventID string) {
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
// Send a marker event to let all of the homeservers know about the
// insertion point where all of the historical messages are at
markerEvent := b.Event{
Type: markerEventType,
Content: map[string]interface{}{
markerInsertionContentField: insertionEventID,
},
}
// We can't use as.SendEventSynced(...) because application services can't use the /sync API
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, "txn-m123"}, client.WithJSONBody(t, markerEvent.Content))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem that the txn id never changes?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the homeserver is new every run, I think it's fine. I did notice that txn-m123 was re-used once, so I updated to something different.

Might be nice to add a helper function to client to get a random unique one easily (using txnId in there). I added an internal one in #192

markerSendBody := client.ParseJSON(t, markerSendRes)
markerEventID := client.GetJSONFieldStr(t, markerSendBody, "event_id")

// Make sure the marker event has reached the remote homeserver
c.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == markerEventID
})

// Make sure all of the base insertion event has been backfilled
// after the marker was received
fetchUntilMessagesResponseHas(t, c, roomID, func(ev gjson.Result) bool {
if ev.Get("event_id").Str == insertionEventID {
return true
}

return false
})
}

func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) {
eventIDs = make([]string, count)
for i := 0; i < len(eventIDs); i++ {
Expand Down