Skip to content

Commit

Permalink
Use PDU not *Event in HeaderedEvent (#3073)
Browse files Browse the repository at this point in the history
Requires matrix-org/gomatrixserverlib#376

This has numerous upsides:
 - Less type casting to `*Event` is required.
- Making Dendrite work with `PDU` interfaces means we can swap out Event
impls more easily.
 - Tests which represent weird event shapes are easier to write.

Part of a series of refactors on GMSL.
  • Loading branch information
kegsay committed May 2, 2023
1 parent 696cbb7 commit f5b3144
Show file tree
Hide file tree
Showing 64 changed files with 296 additions and 284 deletions.
2 changes: 1 addition & 1 deletion appservice/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *OutputRoomEventConsumer) sendEvents(

// If txnID is not defined, generate one from the events.
if txnID == "" {
txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
txnID = fmt.Sprintf("%d_%d", events[0].PDU.OriginServerTS(), len(transaction))
}

// Send the transaction to the appservice.
Expand Down
6 changes: 4 additions & 2 deletions clientapi/routing/aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package routing

import (
"encoding/json"
"fmt"
"net/http"

Expand Down Expand Up @@ -48,11 +49,12 @@ func GetAliases(
visibility := gomatrixserverlib.HistoryVisibilityInvited
if historyVisEvent, ok := stateRes.StateEvents[stateTuple]; ok {
var err error
visibility, err = historyVisEvent.HistoryVisibility()
if err != nil {
var content gomatrixserverlib.HistoryVisibilityContent
if err = json.Unmarshal(historyVisEvent.Content(), &content); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("historyVisEvent.HistoryVisibility failed")
return util.ErrorResponse(fmt.Errorf("historyVisEvent.HistoryVisibility: %w", err))
}
visibility = content.HistoryVisibility
}
if visibility != spec.WorldReadable {
queryReq := api.QueryMembershipForUserRequest{
Expand Down
6 changes: 3 additions & 3 deletions clientapi/routing/createroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func createRoom(
}

// Add the event to the list of auth events
builtEvents = append(builtEvents, &types.HeaderedEvent{Event: ev})
builtEvents = append(builtEvents, &types.HeaderedEvent{PDU: ev})
err = authEvents.AddEvent(ev)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("authEvents.AddEvent failed")
Expand Down Expand Up @@ -536,7 +536,7 @@ func createRoom(
case spec.MRoomMember:
fallthrough
case spec.MRoomJoinRules:
ev := event.Event
ev := event.PDU
globalStrippedState = append(
globalStrippedState,
fclient.NewInviteV2StrippedState(ev),
Expand All @@ -558,7 +558,7 @@ func createRoom(
}
inviteStrippedState := append(
globalStrippedState,
fclient.NewInviteV2StrippedState(inviteEvent.Event),
fclient.NewInviteV2StrippedState(inviteEvent.PDU),
)
// Send the invite event to the roomserver.
event := inviteEvent
Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func SetVisibility(
}

// NOTSPEC: Check if the user's power is greater than power required to change m.room.canonical_alias event
power, _ := gomatrixserverlib.NewPowerLevelContentFromEvent(queryEventsRes.StateEvents[0].Event)
power, _ := gomatrixserverlib.NewPowerLevelContentFromEvent(queryEventsRes.StateEvents[0].PDU)
if power.UserLevel(dev.UserID) < power.EventLevel(spec.MRoomCanonicalAlias, true) {
return util.JSONResponse{
Code: http.StatusForbidden,
Expand Down
12 changes: 6 additions & 6 deletions clientapi/routing/sendevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func SendEvent(
req.Context(), rsAPI,
api.KindNew,
[]*types.HeaderedEvent{
&types.HeaderedEvent{Event: e},
&types.HeaderedEvent{PDU: e},
},
device.UserDomain(),
domain,
Expand Down Expand Up @@ -259,7 +259,7 @@ func generateSendEvent(
cfg *config.ClientAPI,
rsAPI api.ClientRoomserverAPI,
evTime time.Time,
) (*gomatrixserverlib.Event, *util.JSONResponse) {
) (gomatrixserverlib.PDU, *util.JSONResponse) {
// parse the incoming http request
userID := device.UserID

Expand Down Expand Up @@ -313,12 +313,12 @@ func generateSendEvent(
}

// check to see if this user can perform this operation
stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
stateEvents := make([]gomatrixserverlib.PDU, len(queryRes.StateEvents))
for i := range queryRes.StateEvents {
stateEvents[i] = queryRes.StateEvents[i].Event
stateEvents[i] = queryRes.StateEvents[i].PDU
}
provider := gomatrixserverlib.NewAuthEvents(gomatrixserverlib.ToPDUs(stateEvents))
if err = gomatrixserverlib.Allowed(e.Event, &provider); err != nil {
if err = gomatrixserverlib.Allowed(e.PDU, &provider); err != nil {
return nil, &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
Expand All @@ -343,5 +343,5 @@ func generateSendEvent(
}
}

return e.Event, nil
return e.PDU, nil
}
2 changes: 1 addition & 1 deletion clientapi/routing/server_notices.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func SendServerNotice(
ctx, rsAPI,
api.KindNew,
[]*types.HeaderedEvent{
&types.HeaderedEvent{Event: e},
&types.HeaderedEvent{PDU: e},
},
device.UserDomain(),
cfgClient.Matrix.ServerName,
Expand Down
16 changes: 7 additions & 9 deletions cmd/resolve-state/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func main() {
panic(err)
}

events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries))
events := make(map[types.EventNID]gomatrixserverlib.PDU, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.Event
events[entry.EventNID] = entry.PDU
}

if len(removed) > 0 {
Expand Down Expand Up @@ -155,9 +155,9 @@ func main() {
}

authEventIDMap := make(map[string]struct{})
events := make([]*gomatrixserverlib.Event, len(eventEntries))
events := make([]gomatrixserverlib.PDU, len(eventEntries))
for i := range eventEntries {
events[i] = eventEntries[i].Event
events[i] = eventEntries[i].PDU
for _, authEventID := range eventEntries[i].AuthEventIDs() {
authEventIDMap[authEventID] = struct{}{}
}
Expand All @@ -174,17 +174,15 @@ func main() {
panic(err)
}

authEvents := make([]*gomatrixserverlib.Event, len(authEventEntries))
authEvents := make([]gomatrixserverlib.PDU, len(authEventEntries))
for i := range authEventEntries {
authEvents[i] = authEventEntries[i].Event
authEvents[i] = authEventEntries[i].PDU
}

fmt.Println("Resolving state")
var resolved Events
resolved, err = gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion),
gomatrixserverlib.ToPDUs(events),
gomatrixserverlib.ToPDUs(authEvents),
gomatrixserverlib.RoomVersion(*roomVersion), events, authEvents,
)
if err != nil {
panic(err)
Expand Down
22 changes: 11 additions & 11 deletions federationapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
addsStateEvents = append(addsStateEvents, eventsRes.Events...)
}

evs := make([]*gomatrixserverlib.Event, len(addsStateEvents))
evs := make([]gomatrixserverlib.PDU, len(addsStateEvents))
for i := range evs {
evs[i] = addsStateEvents[i].Event
evs[i] = addsStateEvents[i].PDU
}

addsJoinedHosts, err := JoinedHostsFromEvents(evs)
Expand Down Expand Up @@ -340,7 +340,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event.Event)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event.PDU)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
}

// handle peeking hosts
inboundPeeks, err := s.db.GetInboundPeeks(s.ctx, ore.Event.Event.RoomID())
inboundPeeks, err := s.db.GetInboundPeeks(s.ctx, ore.Event.PDU.RoomID())
if err != nil {
return nil, err
}
Expand All @@ -394,7 +394,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
// JoinedHostsFromEvents turns a list of state events into a list of joined hosts.
// This errors if one of the events was invalid.
// It should be impossible for an invalid event to get this far in the pipeline.
func JoinedHostsFromEvents(evs []*gomatrixserverlib.Event) ([]types.JoinedHost, error) {
func JoinedHostsFromEvents(evs []gomatrixserverlib.PDU) ([]types.JoinedHost, error) {
var joinedHosts []types.JoinedHost
for _, ev := range evs {
if ev.Type() != "m.room.member" || ev.StateKey() == nil {
Expand Down Expand Up @@ -459,20 +459,20 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s

// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event *gomatrixserverlib.Event,
) ([]*gomatrixserverlib.Event, error) {
addsStateEventIDs []string, event gomatrixserverlib.PDU,
) ([]gomatrixserverlib.PDU, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
return nil, nil
}

// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []*gomatrixserverlib.Event{event}, nil
return []gomatrixserverlib.PDU{event}, nil
}

missing := addsStateEventIDs
var result []*gomatrixserverlib.Event
var result []gomatrixserverlib.PDU

// Check if event itself is being added.
for _, eventID := range missing {
Expand All @@ -497,7 +497,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
}

for _, headeredEvent := range eventResp.Events {
result = append(result, headeredEvent.Event)
result = append(result, headeredEvent.PDU)
}

missing = missingEventsFrom(result, addsStateEventIDs)
Expand All @@ -511,7 +511,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return result, nil
}

func missingEventsFrom(events []*gomatrixserverlib.Event, required []string) []string {
func missingEventsFrom(events []gomatrixserverlib.PDU, required []string) []string {
have := map[string]bool{}
for _, event := range events {
have[event.EventID()] = true
Expand Down
4 changes: 2 additions & 2 deletions federationapi/federationapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func (f *fedClient) MakeJoin(ctx context.Context, origin, s spec.ServerName, roo
}
return
}
func (f *fedClient) SendJoin(ctx context.Context, origin, s spec.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoin, err error) {
func (f *fedClient) SendJoin(ctx context.Context, origin, s spec.ServerName, event gomatrixserverlib.PDU) (res fclient.RespSendJoin, err error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
for _, r := range f.allowJoins {
if r.ID == event.RoomID() {
r.InsertEvent(f.t, &types.HeaderedEvent{Event: event})
r.InsertEvent(f.t, &types.HeaderedEvent{PDU: event})
f.t.Logf("Join event: %v", event.EventID())
res.StateEvents = types.NewEventJSONsFromHeaderedEvents(r.CurrentState())
res.AuthEvents = types.NewEventJSONsFromHeaderedEvents(r.Events())
Expand Down
2 changes: 1 addition & 1 deletion federationapi/internal/federationclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (a *FederationInternalAPI) MakeJoin(
}

func (a *FederationInternalAPI) SendJoin(
ctx context.Context, origin, s spec.ServerName, event *gomatrixserverlib.Event,
ctx context.Context, origin, s spec.ServerName, event gomatrixserverlib.PDU,
) (res gomatrixserverlib.SendJoinResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
Expand Down
10 changes: 5 additions & 5 deletions federationapi/internal/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
user.Domain(),
roomserverAPI.KindNew,
response.StateSnapshot,
&types.HeaderedEvent{Event: response.JoinEvent},
&types.HeaderedEvent{PDU: response.JoinEvent},
serverName,
nil,
false,
Expand Down Expand Up @@ -389,7 +389,7 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
StateEvents: gomatrixserverlib.NewEventJSONsFromEvents(stateEvents),
AuthEvents: gomatrixserverlib.NewEventJSONsFromEvents(authEvents),
},
&types.HeaderedEvent{Event: respPeek.LatestEvent},
&types.HeaderedEvent{PDU: respPeek.LatestEvent},
serverName,
nil,
false,
Expand Down Expand Up @@ -536,7 +536,7 @@ func (r *FederationInternalAPI) PerformInvite(
"destination": destination,
}).Info("Sending invite")

inviteReq, err := fclient.NewInviteV2Request(request.Event.Event, request.InviteRoomState)
inviteReq, err := fclient.NewInviteV2Request(request.Event.PDU, request.InviteRoomState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
Expand All @@ -554,7 +554,7 @@ func (r *FederationInternalAPI) PerformInvite(
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2 failed to decode event response: %w", err)
}
response.Event = &types.HeaderedEvent{Event: inviteEvent}
response.Event = &types.HeaderedEvent{PDU: inviteEvent}
return nil
}

Expand Down Expand Up @@ -603,7 +603,7 @@ func (r *FederationInternalAPI) MarkServersAlive(destinations []spec.ServerName)
}
}

func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error {
func checkEventsContainCreateEvent(events []gomatrixserverlib.PDU) error {
// sanity check we have a create event and it has a known room version
for _, ev := range events {
if ev.Type() == spec.MRoomCreate && ev.StateKeyEquals("") {
Expand Down
2 changes: 1 addition & 1 deletion federationapi/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func mustCreatePDU(t *testing.T) *types.HeaderedEvent {
if err != nil {
t.Fatalf("failed to create event: %v", err)
}
return &types.HeaderedEvent{Event: ev}
return &types.HeaderedEvent{PDU: ev}
}

func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
Expand Down
6 changes: 3 additions & 3 deletions federationapi/routing/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,18 @@ func Backfill(
}

// Filter any event that's not from the requested room out.
evs := make([]*gomatrixserverlib.Event, 0)
evs := make([]gomatrixserverlib.PDU, 0)

var ev *types.HeaderedEvent
for _, ev = range res.Events {
if ev.RoomID() == roomID {
evs = append(evs, ev.Event)
evs = append(evs, ev.PDU)
}
}

eventJSONs := []json.RawMessage{}
for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(evs),
evs,
gomatrixserverlib.TopologicalOrderByPrevEvents,
) {
eventJSONs = append(eventJSONs, e.JSON())
Expand Down
4 changes: 2 additions & 2 deletions federationapi/routing/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func allowedToSeeEvent(
}

// fetchEvent fetches the event without auth checks. Returns an error if the event cannot be found.
func fetchEvent(ctx context.Context, rsAPI api.FederationRoomserverAPI, roomID, eventID string) (*gomatrixserverlib.Event, *util.JSONResponse) {
func fetchEvent(ctx context.Context, rsAPI api.FederationRoomserverAPI, roomID, eventID string) (gomatrixserverlib.PDU, *util.JSONResponse) {
var eventsResponse api.QueryEventsByIDResponse
err := rsAPI.QueryEventsByID(
ctx,
Expand All @@ -99,5 +99,5 @@ func fetchEvent(ctx context.Context, rsAPI api.FederationRoomserverAPI, roomID,
}
}

return eventsResponse.Events[0].Event, nil
return eventsResponse.Events[0].PDU, nil
}
4 changes: 2 additions & 2 deletions federationapi/routing/invite.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func InviteV1(
func processInvite(
ctx context.Context,
isInviteV2 bool,
event *gomatrixserverlib.Event,
event gomatrixserverlib.PDU,
roomVer gomatrixserverlib.RoomVersion,
strippedState []fclient.InviteV2StrippedState,
roomID string,
Expand Down Expand Up @@ -198,7 +198,7 @@ func processInvite(
)

// Add the invite event to the roomserver.
inviteEvent := &types.HeaderedEvent{Event: &signedEvent}
inviteEvent := &types.HeaderedEvent{PDU: signedEvent}
request := &api.PerformInviteRequest{
Event: inviteEvent,
InviteRoomState: strippedState,
Expand Down
Loading

0 comments on commit f5b3144

Please sign in to comment.