Skip to content

Commit

Permalink
app service unix socket support
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberb committed Mar 21, 2023
1 parent 0459d2b commit 388886d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ complement/
docs/_site

media_store/
build
23 changes: 4 additions & 19 deletions appservice/appservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@ package appservice

import (
"context"
"crypto/tls"
"net/http"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"sync"

appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/appservice/consumers"
Expand All @@ -41,20 +36,10 @@ func NewInternalAPI(
userAPI userapi.AppserviceUserAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceInternalAPI {
client := &http.Client{
Timeout: time.Second * 30,
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: base.Cfg.AppServiceAPI.DisableTLSValidation,
},
Proxy: http.ProxyFromEnvironment,
},
}

// Create appserivce query API with an HTTP client that will be used for all
// outbound and inbound requests (inbound only for the internal API)
appserviceQueryAPI := &query.AppServiceQueryAPI{
HTTPClient: client,
Cfg: &base.Cfg.AppServiceAPI,
ProtocolCache: map[string]appserviceAPI.ASProtocolResponse{},
CacheMu: sync.Mutex{},
Expand All @@ -81,7 +66,7 @@ func NewInternalAPI(
js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
consumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, &base.Cfg.AppServiceAPI,
client, js, rsAPI,
js, rsAPI,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
Expand Down
24 changes: 12 additions & 12 deletions appservice/appservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ func TestAppserviceInternalAPI(t *testing.T) {
defer closeBase()

// Create a dummy application service
base.Cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{
{
ID: "someID",
URL: srv.URL,
ASToken: "",
HSToken: "",
SenderLocalpart: "senderLocalPart",
NamespaceMap: map[string][]config.ApplicationServiceNamespace{
"users": {{RegexpObject: regexp.MustCompile("as-.*")}},
"aliases": {{RegexpObject: regexp.MustCompile("asroom-.*")}},
},
Protocols: []string{existingProtocol},
as := &config.ApplicationService{
ID: "someID",
URL: srv.URL,
ASToken: "",
HSToken: "",
SenderLocalpart: "senderLocalPart",
NamespaceMap: map[string][]config.ApplicationServiceNamespace{
"users": {{RegexpObject: regexp.MustCompile("as-.*")}},
"aliases": {{RegexpObject: regexp.MustCompile("asroom-.*")}},
},
Protocols: []string{existingProtocol},
}
as.HTTPClient = as.CreateHTTPClient(base.Cfg.AppServiceAPI.DisableTLSValidation)
base.Cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}

caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
// Create required internal APIs
Expand Down
9 changes: 3 additions & 6 deletions appservice/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.AppServiceAPI
client *http.Client
jetstream nats.JetStreamContext
topic string
rsAPI api.AppserviceRoomserverAPI
Expand All @@ -56,14 +55,12 @@ type appserviceState struct {
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.AppServiceAPI,
client *http.Client,
js nats.JetStreamContext,
rsAPI api.AppserviceRoomserverAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
client: client,
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
rsAPI: rsAPI,
Expand Down Expand Up @@ -195,13 +192,13 @@ func (s *OutputRoomEventConsumer) sendEvents(

// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
address := fmt.Sprintf("%s/transactions/%s?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
address := fmt.Sprintf("%s/transactions/%s?access_token=%s", state.RequestUrl(), txnID, url.QueryEscape(state.HSToken))
req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := s.client.Do(req)
resp, err := state.HTTPClient.Do(req)
if err != nil {
return state.backoffAndPause(err)
}
Expand All @@ -212,7 +209,7 @@ func (s *OutputRoomEventConsumer) sendEvents(
case http.StatusOK:
state.backoff = 0
default:
return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice", resp.StatusCode))
return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice url %s", resp.StatusCode, address))
}
return nil
}
Expand Down
21 changes: 10 additions & 11 deletions appservice/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const userIDExistsPath = "/users/"

// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
type AppServiceQueryAPI struct {
HTTPClient *http.Client
Cfg *config.AppServiceAPI
ProtocolCache map[string]api.ASProtocolResponse
CacheMu sync.Mutex
Expand All @@ -57,7 +56,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
// The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
URL, err := url.Parse(appservice.RequestUrl() + roomAliasExistsPath)
if err != nil {
return err
}
Expand All @@ -73,7 +72,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
}
req = req.WithContext(ctx)

resp, err := a.HTTPClient.Do(req)
resp, err := appservice.HTTPClient.Do(req)
if resp != nil {
defer func() {
err = resp.Body.Close()
Expand Down Expand Up @@ -124,7 +123,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
// The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + userIDExistsPath)
URL, err := url.Parse(appservice.RequestUrl() + userIDExistsPath)
if err != nil {
return err
}
Expand All @@ -137,7 +136,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
if err != nil {
return err
}
resp, err := a.HTTPClient.Do(req.WithContext(ctx))
resp, err := appservice.HTTPClient.Do(req.WithContext(ctx))
if resp != nil {
defer func() {
err = resp.Body.Close()
Expand Down Expand Up @@ -212,12 +211,12 @@ func (a *AppServiceQueryAPI) Locations(
var asLocations []api.ASLocationResponse
params.Set("access_token", as.HSToken)

url := as.URL + api.ASLocationPath
url := as.RequestUrl() + api.ASLocationPath
if req.Protocol != "" {
url += "/" + req.Protocol
}

if err := requestDo[[]api.ASLocationResponse](a.HTTPClient, url+"?"+params.Encode(), &asLocations); err != nil {
if err := requestDo[[]api.ASLocationResponse](as.HTTPClient, url+"?"+params.Encode(), &asLocations); err != nil {
log.WithError(err).Error("unable to get 'locations' from application service")
continue
}
Expand Down Expand Up @@ -247,12 +246,12 @@ func (a *AppServiceQueryAPI) User(
var asUsers []api.ASUserResponse
params.Set("access_token", as.HSToken)

url := as.URL + api.ASUserPath
url := as.RequestUrl() + api.ASUserPath
if req.Protocol != "" {
url += "/" + req.Protocol
}

if err := requestDo[[]api.ASUserResponse](a.HTTPClient, url+"?"+params.Encode(), &asUsers); err != nil {
if err := requestDo[[]api.ASUserResponse](as.HTTPClient, url+"?"+params.Encode(), &asUsers); err != nil {
log.WithError(err).Error("unable to get 'user' from application service")
continue
}
Expand Down Expand Up @@ -290,7 +289,7 @@ func (a *AppServiceQueryAPI) Protocols(
response := api.ASProtocolResponse{}
for _, as := range a.Cfg.Derived.ApplicationServices {
var proto api.ASProtocolResponse
if err := requestDo[api.ASProtocolResponse](a.HTTPClient, as.URL+api.ASProtocolPath+req.Protocol, &proto); err != nil {
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+req.Protocol, &proto); err != nil {
log.WithError(err).Error("unable to get 'protocol' from application service")
continue
}
Expand Down Expand Up @@ -320,7 +319,7 @@ func (a *AppServiceQueryAPI) Protocols(
for _, as := range a.Cfg.Derived.ApplicationServices {
for _, p := range as.Protocols {
var proto api.ASProtocolResponse
if err := requestDo[api.ASProtocolResponse](a.HTTPClient, as.URL+api.ASProtocolPath+p, &proto); err != nil {
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+p, &proto); err != nil {
log.WithError(err).Error("unable to get 'protocol' from application service")
continue
}
Expand Down
48 changes: 46 additions & 2 deletions setup/config/config_appservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
package config

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

const UnixSocketPrefix = "unix://"

type AppServiceAPI struct {
Matrix *Global `yaml:"-"`
Derived *Derived `yaml:"-"` // TODO: Nuke Derived from orbit
Expand Down Expand Up @@ -80,7 +87,44 @@ type ApplicationService struct {
// Whether rate limiting is applied to each application service user
RateLimited bool `yaml:"rate_limited"`
// Any custom protocols that this application service provides (e.g. IRC)
Protocols []string `yaml:"protocols"`
Protocols []string `yaml:"protocols"`
HTTPClient *http.Client
}

func (a *ApplicationService) CreateHTTPClient(insecureSkipVerify bool) *http.Client {
client := &http.Client{Timeout: time.Second * 30}
if a.IsUnixSocketUrl() {
client.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", a.UnixSocket())
},
}
} else {
client.Transport = &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: insecureSkipVerify,
},
Proxy: http.ProxyFromEnvironment,
}
}
return client
}

func (a *ApplicationService) IsUnixSocketUrl() bool {
return strings.HasPrefix(a.URL, UnixSocketPrefix)
}

func (a *ApplicationService) UnixSocket() string {
return strings.TrimPrefix(a.URL, UnixSocketPrefix)
}

func (a *ApplicationService) RequestUrl() string {
if a.IsUnixSocketUrl() {
return "http://unix"
} else {
return a.URL
}
}

// IsInterestedInRoomID returns a bool on whether an application service's
Expand Down Expand Up @@ -172,7 +216,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
if err = yaml.Unmarshal(configData, &appservice); err != nil {
return err
}

appservice.HTTPClient = appservice.CreateHTTPClient(config.DisableTLSValidation)
// Append the parsed application service to the global config
derived.ApplicationServices = append(
derived.ApplicationServices, appservice,
Expand Down

0 comments on commit 388886d

Please sign in to comment.