Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert blockservice sessions context #578

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The following emojis are used to highlight certain changes:

### Removed

- 🛠 `blockservice`: reverted change in previous release where sessions had been introduced.

### Fixed

### Security
Expand Down
55 changes: 0 additions & 55 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,6 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
ses := grabSessionFromContext(ctx, bs)
if ses != nil {
return ses
}

return newSession(ctx, bs)
}

// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
func newSession(ctx context.Context, bs BlockService) *Session {
return &Session{bs: bs, sesctx: ctx}
}

Expand Down Expand Up @@ -232,10 +222,6 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

Expand Down Expand Up @@ -295,10 +281,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

Expand Down Expand Up @@ -474,43 +456,6 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, newSession(ctx, bs))
}

// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
if s == nil {
return nil
}

ss, ok := s.(*Session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

return ss
}

// grabAllowlistFromBlockservice never returns nil
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
if bbs, ok := bs.(BoundedBlockService); ok {
Expand Down
65 changes: 0 additions & 65 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,68 +288,3 @@ func TestAllowlist(t *testing.T) {
check(blockservice.GetBlock)
check(NewSession(ctx, blockservice).GetBlock)
}

type fakeIsNewSessionCreateExchange struct {
ses exchange.Fetcher
newSessionWasCalled bool
}

var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil)

func (*fakeIsNewSessionCreateExchange) Close() error {
return nil
}

func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
panic("should call on the session")
}

func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
panic("should call on the session")
}

func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher {
f.newSessionWasCalled = true
return f.ses
}

func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error {
return nil
}

func TestContextSession(t *testing.T) {
t.Parallel()
a := assert.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bgen := butil.NewBlockGenerator()
block1 := bgen.Next()
block2 := bgen.Next()

bs := blockstore.NewBlockstore(ds.NewMapDatastore())
a.NoError(bs.Put(ctx, block1))
a.NoError(bs.Put(ctx, block2))
sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)}

service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)

ctx = ContextWithSession(ctx, service)

b, err := service.GetBlock(ctx, block1.Cid())
a.NoError(err)
a.Equal(b.RawData(), block1.RawData())
a.True(sesEx.newSessionWasCalled, "new session from context should be created")
sesEx.newSessionWasCalled = false

bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()})
a.Equal((<-bchan).RawData(), block2.RawData())
a.False(sesEx.newSessionWasCalled, "session should be reused in context")

a.Equal(
NewSession(ctx, service),
NewSession(ContextWithSession(ctx, service), service),
"session must be deduped in all invocations on the same context",
)
}
6 changes: 0 additions & 6 deletions gateway/blocks_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,6 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool {
return has
}

var _ WithContextHint = (*BlocksBackend)(nil)

func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context {
return blockservice.ContextWithSession(ctx, bb.blockService)
}

func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) {
roots, lastSeg, remainder, err := bb.getPathRoots(ctx, path)
if err != nil {
Expand Down
Loading