Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove BaseDendrite #3023

Merged
merged 24 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
93d0fba
Remove NATS from BaseDendrite
S7evinK Mar 20, 2023
75ba4ec
Remove BaseDendrite from appservice
S7evinK Mar 20, 2023
f485c35
Remove BaseDendrite from media API
S7evinK Mar 20, 2023
1fd3b28
Remove BaseDendrite from client API
S7evinK Mar 20, 2023
16fb22c
Remove BaseDendrite from federation API
S7evinK Mar 20, 2023
b7947c3
Remove BaseDendrite from roomserver
S7evinK Mar 20, 2023
5ef86d9
Remove BaseDendrite from relay API
S7evinK Mar 20, 2023
72d4833
Remove BaseDendrite from SyncAPI
S7evinK Mar 20, 2023
1d2429c
Remove BaseDendrite from userAPI
S7evinK Mar 20, 2023
fffd2d9
Test updates, I guess
S7evinK Mar 20, 2023
edb5b4b
Let CreateConfig return a ProcessContext
S7evinK Mar 20, 2023
4c77ff3
Remove BaseDendrite from tests
S7evinK Mar 20, 2023
d4db4ed
Add ProcessContext to ConnectionManager to cleanly shut down database
S7evinK Mar 20, 2023
9b06041
Remove BaseDendrite from relay API
S7evinK Mar 21, 2023
c673899
Remove BaseDendrite from syncAPI tests
S7evinK Mar 21, 2023
b9b1838
Almost remove every BaseDendrite from federation API tests
S7evinK Mar 21, 2023
85f76d7
Remove BaseDendrite from almost all roomserver tests
S7evinK Mar 21, 2023
b5789f2
BaseDendrite be gone!
S7evinK Mar 21, 2023
e0096fe
Linter
S7evinK Mar 21, 2023
1c66854
Fix race condition
S7evinK Mar 21, 2023
66092fc
Another try to fix a race condition
S7evinK Mar 21, 2023
ae32587
Another try?
S7evinK Mar 21, 2023
761d777
Merge branch 'main' into s7evink/baserefactor
S7evinK Mar 22, 2023
a09828b
Retry getting tags
S7evinK Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions appservice/appservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"

"github.com/matrix-org/gomatrixserverlib"
Expand All @@ -29,15 +31,16 @@ import (
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/query"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
)

// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *base.BaseDendrite,
processContext *process.ProcessContext,
cfg *config.Dendrite,
natsInstance *jetstream.NATSInstance,
userAPI userapi.AppserviceUserAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceInternalAPI {
Expand All @@ -46,7 +49,7 @@ func NewInternalAPI(
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: base.Cfg.AppServiceAPI.DisableTLSValidation,
InsecureSkipVerify: cfg.AppServiceAPI.DisableTLSValidation,
},
Proxy: http.ProxyFromEnvironment,
},
Expand All @@ -55,21 +58,21 @@ func NewInternalAPI(
// outbound and inbound requests (inbound only for the internal API)
appserviceQueryAPI := &query.AppServiceQueryAPI{
HTTPClient: client,
Cfg: &base.Cfg.AppServiceAPI,
Cfg: &cfg.AppServiceAPI,
ProtocolCache: map[string]appserviceAPI.ASProtocolResponse{},
CacheMu: sync.Mutex{},
}

if len(base.Cfg.Derived.ApplicationServices) == 0 {
if len(cfg.Derived.ApplicationServices) == 0 {
return appserviceQueryAPI
}

// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
for _, appservice := range base.Cfg.Derived.ApplicationServices {
for _, appservice := range cfg.Derived.ApplicationServices {
// Create bot account for this AS if it doesn't already exist
if err := generateAppServiceAccount(userAPI, appservice, base.Cfg.Global.ServerName); err != nil {
if err := generateAppServiceAccount(userAPI, appservice, cfg.Global.ServerName); err != nil {
logrus.WithFields(logrus.Fields{
"appservice": appservice.ID,
}).WithError(err).Panicf("failed to generate bot account for appservice")
Expand All @@ -78,9 +81,9 @@ func NewInternalAPI(

// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
// We can't add ASes at runtime so this is safe to do.
js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
js, _ := natsInstance.Prepare(processContext, &cfg.Global.JetStream)
consumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, &base.Cfg.AppServiceAPI,
processContext, &cfg.AppServiceAPI,
client, js, rsAPI,
)
if err := consumer.Start(); err != nil {
Expand Down
23 changes: 16 additions & 7 deletions appservice/appservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"regexp"
"strings"
"testing"
"time"

"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"

Expand Down Expand Up @@ -105,11 +108,11 @@ func TestAppserviceInternalAPI(t *testing.T) {
}

test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, closeBase := testrig.CreateBaseDendrite(t, dbType)
defer closeBase()
cfg, ctx, close := testrig.CreateConfig(t, dbType)
defer close()

// Create a dummy application service
base.Cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{
cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{
{
ID: "someID",
URL: srv.URL,
Expand All @@ -124,11 +127,17 @@ func TestAppserviceInternalAPI(t *testing.T) {
},
}

caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
t.Cleanup(func() {
ctx.ShutdownDendrite()
ctx.WaitForShutdown()
})
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
// Create required internal APIs
rsAPI := roomserver.NewInternalAPI(base, caches)
usrAPI := userapi.NewInternalAPI(base, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI)
natsInstance := jetstream.NATSInstance{}
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)

runCases(t, asAPI)
})
Expand Down
40 changes: 21 additions & 19 deletions build/dendritejs-pinecone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/userapi"

"github.com/matrix-org/gomatrixserverlib"
Expand Down Expand Up @@ -158,9 +160,8 @@ func startup() {
pManager.AddPeer("wss://pinecone.matrix.org/public")

cfg := &config.Dendrite{}
cfg.Defaults(true)
cfg.Defaults(config.DefaultOpts{Generate: true, SingleDatabase: false})
cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db"
cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db"
cfg.FederationAPI.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db"
cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db"
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
Expand All @@ -177,29 +178,30 @@ func startup() {
if err := cfg.Derive(); err != nil {
logrus.Fatalf("Failed to derive values from config: %s", err)
}
base := base.NewBaseDendrite(cfg)
defer base.Close() // nolint: errcheck
natsInstance := jetstream.NATSInstance{}
processCtx := process.NewProcessContext()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
routers := httputil.NewRouters()
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)

rsAPI := roomserver.NewInternalAPI(base)

federation := conn.CreateFederationClient(base, pSessions)
federation := conn.CreateFederationClient(cfg, pSessions)

serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()

userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation)

asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
processCtx, cfg, &natsInstance, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, caches, keyRing, true)
fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)

monolith := setup.Monolith{
Config: base.Cfg,
Client: conn.CreateClient(base, pSessions),
Config: cfg,
Client: conn.CreateClient(pSessions),
FedClient: federation,
KeyRing: keyRing,

Expand All @@ -210,15 +212,15 @@ func startup() {
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
}
monolith.AddAllPublicRoutes(base, caches)
monolith.AddAllPublicRoutes(processCtx, cfg, routers, cm, &natsInstance, caches, caching.EnableMetrics)

httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client)
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media)
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(routers.Client)
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(routers.Media)

p2pRouter := pSessions.Protocol("matrix").HTTP().Mux()
p2pRouter.Handle(httputil.PublicFederationPathPrefix, base.Routers.Federation)
p2pRouter.Handle(httputil.PublicMediaPathPrefix, base.Routers.Media)
p2pRouter.Handle(httputil.PublicFederationPathPrefix, routers.Federation)
p2pRouter.Handle(httputil.PublicMediaPathPrefix, routers.Media)

// Expose the matrix APIs via fetch - for local traffic
go func() {
Expand Down
13 changes: 10 additions & 3 deletions build/gobind-pinecone/monolith.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/process"
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/pinecone/types"
Expand Down Expand Up @@ -187,7 +190,7 @@ func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) {
relay.UpdateNodeRelayServers(
gomatrixserverlib.ServerName(nodeKey),
relays,
m.p2pMonolith.BaseDendrite.Context(),
m.p2pMonolith.ProcessCtx.Context(),
m.p2pMonolith.GetFederationAPI(),
)
}
Expand All @@ -214,7 +217,7 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
} else {
request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(nodeKey)}
response := api.P2PQueryRelayServersResponse{}
err := m.p2pMonolith.GetFederationAPI().P2PQueryRelayServers(m.p2pMonolith.BaseDendrite.Context(), &request, &response)
err := m.p2pMonolith.GetFederationAPI().P2PQueryRelayServers(m.p2pMonolith.ProcessCtx.Context(), &request, &response)
if err != nil {
logrus.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error())
return ""
Expand Down Expand Up @@ -346,10 +349,14 @@ func (m *DendriteMonolith) Start() {
// This isn't actually fixed: https://github.com/blevesearch/zapx/pull/147
cfg.SyncAPI.Fulltext.Enabled = false

processCtx := process.NewProcessContext()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
routers := httputil.NewRouters()

enableRelaying := false
enableMetrics := false
enableWebsockets := false
m.p2pMonolith.SetupDendrite(cfg, 65432, enableRelaying, enableMetrics, enableWebsockets)
m.p2pMonolith.SetupDendrite(processCtx, cfg, cm, routers, 65432, enableRelaying, enableMetrics, enableWebsockets)
m.p2pMonolith.StartMonolith()
}

Expand Down
Loading