Skip to content

Commit

Permalink
tests: Test separate http port connection multiplexing
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Mar 30, 2023
1 parent bf12179 commit 65add8c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 47 deletions.
41 changes: 27 additions & 14 deletions tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
func TestConnectionMultiplexing(t *testing.T) {
e2e.BeforeTest(t)
for _, tc := range []struct {
name string
serverTLS e2e.ClientConnType
name string
serverTLS e2e.ClientConnType
separateHttpPort bool
}{
{
name: "ServerTLS",
Expand All @@ -54,10 +55,20 @@ func TestConnectionMultiplexing(t *testing.T) {
name: "ServerTLSAndNonTLS",
serverTLS: e2e.ClientTLSAndNonTLS,
},
{
name: "SeparateHTTP/ServerTLS",
serverTLS: e2e.ClientTLS,
separateHttpPort: true,
},
{
name: "SeparateHTTP/ServerNonTLS",
serverTLS: e2e.ClientNonTLS,
separateHttpPort: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}}
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}, ClientHttpSeparate: tc.separateHttpPort}
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg))
require.NoError(t, err)
defer clus.Close()
Expand All @@ -78,30 +89,32 @@ func TestConnectionMultiplexing(t *testing.T) {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(t, ctx, clus.EndpointsV3()[0], clientTLS)
testConnectionMultiplexing(t, ctx, clus.Procs[0], clientTLS)
})
}
})
}

}

func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint string, connType e2e.ClientConnType) {
func testConnectionMultiplexing(t *testing.T, ctx context.Context, member e2e.EtcdProcess, connType e2e.ClientConnType) {
httpEndpoint := member.EndpointsHTTP()[0]
grpcEndpoint := member.EndpointsGRPC()[0]
switch connType {
case e2e.ClientTLS:
endpoint = e2e.ToTLS(endpoint)
httpEndpoint = e2e.ToTLS(httpEndpoint)
grpcEndpoint = e2e.ToTLS(grpcEndpoint)
case e2e.ClientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{endpoint})
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{grpcEndpoint})
require.NoError(t, err)
_, err = etcdctl.Get(ctx, "a", config.GetOptions{})
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, e2e.ClientConfig{ConnectionType: connType})
c := newClient(t, []string{grpcEndpoint}, e2e.ClientConfig{ConnectionType: connType})
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
Expand All @@ -112,11 +125,11 @@ func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint stri
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType))
})
}
})
Expand Down
71 changes: 45 additions & 26 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,23 +458,20 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer

func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl, curltls string
var curl string
port := cfg.BasePort + 5*i
clientPort := port
peerPort := port + 1
metricsPort := port + 2
peer2Port := port + 3
clientHttpPort := port + 4

curlHost := fmt.Sprintf("localhost:%d", clientPort)
switch cfg.Client.ConnectionType {
case ClientNonTLS, ClientTLS:
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(clientPort, ClientNonTLS)
curls = []string{curl, clientURL(clientPort, ClientTLS)}
} else {
curl = clientURL(clientPort, cfg.Client.ConnectionType)
curls = []string{curl}
case ClientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}

peerListenUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
Expand Down Expand Up @@ -513,9 +510,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
}
var clientHttpUrl string
if cfg.ClientHttpSeparate {
clientHttpUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)}
args = append(args, "--listen-client-http-urls", clientHttpUrl.String())
clientHttpUrl = clientURL(clientHttpPort, cfg.Client.ConnectionType)
args = append(args, "--listen-client-http-urls", clientHttpUrl)
}

if cfg.ForceNewCluster {
Expand Down Expand Up @@ -625,21 +623,34 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
}

return &EtcdServerProcessConfig{
lg: cfg.Logger,
ExecPath: execPath,
Args: args,
EnvVars: envVars,
TlsArgs: cfg.TlsArgs(),
Client: cfg.Client,
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Name: name,
PeerURL: peerAdvertiseUrl,
ClientURL: curl,
MetricsURL: murl,
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
lg: cfg.Logger,
ExecPath: execPath,
Args: args,
EnvVars: envVars,
TlsArgs: cfg.TlsArgs(),
Client: cfg.Client,
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Name: name,
PeerURL: peerAdvertiseUrl,
ClientURL: curl,
ClientHTTPURL: clientHttpUrl,
MetricsURL: murl,
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
}
}

func clientURL(port int, connType ClientConnType) string {
curlHost := fmt.Sprintf("localhost:%d", port)
switch connType {
case ClientNonTLS:
return (&url.URL{Scheme: "http", Host: curlHost}).String()
case ClientTLS:
return (&url.URL{Scheme: "https", Host: curlHost}).String()
default:
panic(fmt.Sprintf("Unsupported connection type %v", connType))
}
}

Expand Down Expand Up @@ -693,6 +704,14 @@ func (epc *EtcdProcessCluster) EndpointsV3() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsV3() })
}

func (epc *EtcdProcessCluster) EndpointsGRPC() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsGRPC() })
}

func (epc *EtcdProcessCluster) EndpointsHTTP() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsHTTP() })
}

func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret []string) {
for _, p := range epc.Procs {
ret = append(ret, f(p)...)
Expand Down
6 changes: 4 additions & 2 deletions tests/framework/e2e/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ func NewProxyEtcdProcess(cfg *EtcdServerProcessConfig) (*proxyEtcdProcess, error

func (p *proxyEtcdProcess) Config() *EtcdServerProcessConfig { return p.etcdProc.Config() }

func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}
Expand Down
20 changes: 15 additions & 5 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
type EtcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
EndpointsGRPC() []string
EndpointsHTTP() []string
EndpointsMetrics() []string
Client(opts ...config.ClientOption) *EtcdctlV3

Expand Down Expand Up @@ -86,9 +88,10 @@ type EtcdServerProcessConfig struct {

Name string

PeerURL url.URL
ClientURL string
MetricsURL string
PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string

InitialToken string
InitialCluster string
Expand All @@ -113,8 +116,15 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
return ep, nil
}

func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} }
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *EtcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() }
func (ep *EtcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.ClientURL} }
func (ep *EtcdServerProcess) EndpointsHTTP() []string {
if ep.cfg.ClientHTTPURL == "" {
return []string{ep.cfg.ClientURL}
}
return []string{ep.cfg.ClientHTTPURL}
}
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.MetricsURL} }

func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
Expand Down

0 comments on commit 65add8c

Please sign in to comment.