Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce ssh upstream timeout #260

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ftests/ftests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (c *Client) JoinWithContext(ctx context.Context, session *api.GetSessionRes
}

if u.Scheme == "ws" || u.Scheme == "wss" {
encodedNodeAddr := base64.StdEncoding.EncodeToString([]byte(session.NodeAddr))
encodedNodeAddr := base64.URLEncoding.EncodeToString([]byte(session.NodeAddr))
u, _ = url.Parse(u.String())
u.User = url.UserPassword(session.SessionId, encodedNodeAddr)
c.sshClient, err = ws.NewSSHClient(u, config, true)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace golang.org/x/crypto => github.com/tg123/sshpiper.crypto v0.22.0-sshpiper-20240413
replace golang.org/x/crypto => github.com/tg123/sshpiper.crypto v0.23.0-sshpiper-20240508
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/stripe/stripe-go v70.15.0+incompatible/go.mod h1:A1dQZmO/QypXmsL0T8axYZkSN/uA/T/A64pfKdBAMiY=
github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af h1:6yITBqGTE2lEeTPG04SN9W+iWHCRyHqlVYILiSXziwk=
github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af/go.mod h1:4F09kP5F+am0jAwlQLddpoMDM+iewkxxt6nxUQ5nq5o=
github.com/tg123/sshpiper.crypto v0.22.0-sshpiper-20240413 h1:fqKuFKtWQKOtx0k6PrnAhcz5sI2P8y+DTlG4MjMuj0k=
github.com/tg123/sshpiper.crypto v0.22.0-sshpiper-20240413/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
github.com/tg123/sshpiper.crypto v0.23.0-sshpiper-20240508 h1:TF3Ca6LSU+c/aXl1aT/bTjNUKQyBROjPs5HFz4GMHw0=
github.com/tg123/sshpiper.crypto v0.23.0-sshpiper-20240508/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
github.com/thlib/go-timezone-local v0.0.0-20210907160436-ef149e42d28e h1:BuzhfgfWQbX0dWzYzT1zsORLnHRv3bcRcsaUk0VmXA8=
github.com/thlib/go-timezone-local v0.0.0-20210907160436-ef149e42d28e/go.mod h1:/Tnicc6m/lsJE0irFMA0LfIwTBo4QP7A8IfyIv4zZKI=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
Expand Down Expand Up @@ -262,21 +262,20 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
30 changes: 16 additions & 14 deletions host/api/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/base64"
"fmt"
"strings"

"github.com/owenthereal/upterm/upterm"
Expand All @@ -27,27 +28,28 @@ func EncodeIdentifier(id *Identifier) (string, error) {
}

func DecodeIdentifier(id, clientVersion string) (*Identifier, error) {
t := Identifier_HOST
if clientVersion != upterm.HostSSHClientVersion {
t = Identifier_CLIENT
// host
if clientVersion == upterm.HostSSHClientVersion {
return &Identifier{
Id: id,
Type: Identifier_HOST,
}, nil
}

// client
split := strings.SplitN(id, ":", 2)
var (
nodeAddr []byte
err error
)

if len(split) == 2 {
nodeAddr, err = base64.URLEncoding.DecodeString(split[1])
if err != nil {
return nil, err
}
if len(split) != 2 {
return nil, fmt.Errorf("invalid client session id: %s", id)
}

nodeAddr, err := base64.URLEncoding.DecodeString(split[1])
if err != nil {
return nil, err
}

return &Identifier{
Id: split[0],
Type: t,
Type: Identifier_CLIENT,
NodeAddr: string(nodeAddr),
}, nil
}
27 changes: 18 additions & 9 deletions host/api/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/owenthereal/upterm/upterm"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -37,20 +38,28 @@ func Test_EncodeDecodeIdentifier(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
t.Parallel()

assert := assert.New(t)

want := c.id
str, err := EncodeIdentifier(want)
if err != nil {
t.Fatal(err)
}
assert.NoError(err)

got, err := DecodeIdentifier(str, c.clientVersion)
if err != nil {
t.Fatal(err)
}
assert.NoError(err)

if !proto.Equal(want, got) {
t.Errorf("Encode/decode failed, want=%s got=%s", want, got)
}
assert.True(proto.Equal(want, got))
})
}
}

func TestDecodeIdentifier(t *testing.T) {
assert := assert.New(t)

_, err := DecodeIdentifier("10OLFAKZu4cxx2roOboaY:MTI3LjAuMC4xOjIyMjIIII=", "")
assert.Error(err)
assert.ErrorContains(err, "illegal base64 data")

_, err = DecodeIdentifier("10OLFAKZu4cxx2roOboaY", "")
assert.Error(err)
assert.ErrorContains(err, "invalid client session id")
}
41 changes: 27 additions & 14 deletions server/sshrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/provider"
"github.com/owenthereal/upterm/host/api"
libmetrics "github.com/owenthereal/upterm/metrics"
"github.com/owenthereal/upterm/upterm"
log "github.com/sirupsen/logrus"
Expand All @@ -16,7 +17,7 @@ import (

var (
ErrListnerClosed = errors.New("routing: listener closed")
pipeEstablishingTimeout = 60 * time.Second
pipeEstablishingTimeout = 3 * time.Second
)

type SSHRouting struct {
Expand Down Expand Up @@ -53,20 +54,31 @@ func (p *SSHRouting) Serve(ln net.Listener) error {
p.listener = ln
p.mux.Unlock()

piper := &ssh.PiperConfig{
piperCfg := &ssh.PiperConfig{
PublicKeyCallback: p.AuthPiper.PublicKeyCallback,
ServerVersion: upterm.ServerSSHServerVersion,
}
NextAuthMethods: func(conn ssh.ConnMetadata, challengeCtx ssh.ChallengeContext) ([]string, error) {
// Fail early if the user is not a valid identifier.
user := conn.User()
if user != "" {
_, err := api.DecodeIdentifier(user, string(conn.ClientVersion()))
if err != nil {
return nil, err
}
}

return []string{"publickey"}, nil
},
}
for _, s := range p.HostSigners {
piper.AddHostKey(s)
piperCfg.AddHostKey(s)
}

inst := newSSHRoutingInstruments(p.MetricsProvider)

var tempDelay time.Duration // how long to sleep on accept failure
for {
conn, err := ln.Accept()
dconn, err := ln.Accept()
if err != nil {
select {
case <-p.getDoneChan():
Expand Down Expand Up @@ -95,9 +107,9 @@ func (p *SSHRouting) Serve(ln net.Listener) error {

tempDelay = 0

logger := p.Logger.WithField("addr", conn.RemoteAddr())
go func(c net.Conn, inst *routingInstruments, logger log.FieldLogger) {
defer c.Close()
logger := p.Logger.WithField("addr", dconn.RemoteAddr())
go func(dconn net.Conn, inst *routingInstruments, logger log.FieldLogger) {
defer dconn.Close()

defer libmetrics.MeasureSince(inst.connectionDuration, time.Now())
defer inst.activeConnections.Add(-1)
Expand All @@ -113,20 +125,21 @@ func (p *SSHRouting) Serve(ln net.Listener) error {
close(errorc)
}()

p, err := ssh.NewSSHPiperConn(c, piper)
logger.Info("establishing ssh piper connection")
pconn, err := ssh.NewSSHPiperConn(dconn, piperCfg)
if err != nil {
errorc <- err
return
}

pipec <- p
pipec <- pconn
}()

select {
case pc := <-pipec:
defer pc.Close()
case pconn := <-pipec:
defer pconn.Close()

if err := pc.Wait(); err != nil {
if err := pconn.Wait(); err != nil {
logger.WithError(err).Debug("error waiting for pipe")
inst.errors.Add(1)
}
Expand All @@ -137,7 +150,7 @@ func (p *SSHRouting) Serve(ln net.Listener) error {
logger.Debug("pipe establishing timeout")
inst.connectionTimeouts.Add(1)
}
}(conn, inst, logger)
}(dconn, inst, logger)
}
}

Expand Down