Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor database code to support standalone batches, transactions. #861

Merged
merged 1 commit into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/db_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ func aptlyDbCleanup(cmd *commander.Command, args []string) error {
}

if !dryRun {
db.StartBatch()
batch := db.CreateBatch()
err = toDelete.ForEach(func(ref []byte) error {
return context.CollectionFactory().PackageCollection().DeleteByKey(ref)
return context.CollectionFactory().PackageCollection().DeleteByKey(ref, batch)
})
if err != nil {
return err
}

err = db.FinishBatch()
err = batch.Write()
if err != nil {
return fmt.Errorf("unable to write to DB: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/db_recover.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cmd

import (
"github.com/aptly-dev/aptly/database"
"github.com/smira/commander"

"github.com/aptly-dev/aptly/database/goleveldb"
)

// aptly db recover
Expand All @@ -15,7 +16,7 @@ func aptlyDbRecover(cmd *commander.Command, args []string) error {
}

context.Progress().Printf("Recovering database...\n")
err = database.RecoverDB(context.DBPath())
err = goleveldb.RecoverDB(context.DBPath())

return err
}
Expand Down
3 changes: 2 additions & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/console"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/http"
Expand Down Expand Up @@ -252,7 +253,7 @@ func (context *AptlyContext) _database() (database.Storage, error) {
if context.database == nil {
var err error

context.database, err = database.NewDB(context.dbPath())
context.database, err = goleveldb.NewDB(context.dbPath())
if err != nil {
return nil, fmt.Errorf("can't instantiate database: %s", err)
}
Expand Down
69 changes: 69 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Package database provides KV database for meta-information
package database

import "errors"

// Errors for Storage
var (
ErrNotFound = errors.New("key not found")
)

// StorageProcessor is a function to process one single storage entry
type StorageProcessor func(key []byte, value []byte) error

// Reader provides KV read calls
type Reader interface {
Get(key []byte) ([]byte, error)
}

// PrefixReader provides prefixed operations
type PrefixReader interface {
HasPrefix(prefix []byte) bool
ProcessByPrefix(prefix []byte, proc StorageProcessor) error
KeysByPrefix(prefix []byte) [][]byte
FetchByPrefix(prefix []byte) [][]byte
}

// Writer provides KV update/delete calls
type Writer interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
}

// Storage is an interface to KV storage
type Storage interface {
Reader
Writer

PrefixReader

CreateBatch() Batch
OpenTransaction() (Transaction, error)

CreateTemporary() (Storage, error)

Open() error
Close() error
CompactDB() error
Drop() error
}

// Batch provides a way to pack many writes.
type Batch interface {
Writer

// Write closes batch and send accumulated writes to the database
Write() error
}

// Transaction provides isolated atomic way to perform updates.
//
// Transactions might be expensive.
// Transaction should always finish with either Discard() or Commit()
type Transaction interface {
Reader
Writer

Commit() error
Discard()
}
34 changes: 34 additions & 0 deletions database/goleveldb/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package goleveldb

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/aptly-dev/aptly/database"
)

type batch struct {
db *leveldb.DB
b *leveldb.Batch
}

func (b *batch) Put(key, value []byte) error {
b.b.Put(key, value)

return nil
}

func (b *batch) Delete(key []byte) error {
b.b.Delete(key)

return nil
}

func (b *batch) Write() error {
return b.db.Write(b.b, &opt.WriteOptions{})
}

// batch should implement database.Batch
var (
_ database.Batch = &batch{}
)
58 changes: 58 additions & 0 deletions database/goleveldb/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package goleveldb

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldbstorage "github.com/syndtr/goleveldb/leveldb/storage"

"github.com/aptly-dev/aptly/database"
)

func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) {
o := &opt.Options{
Filter: filter.NewBloomFilter(10),
OpenFilesCacheCapacity: 256,
}

if throttleCompaction {
o.CompactionL0Trigger = 32
o.WriteL0PauseTrigger = 96
o.WriteL0SlowdownTrigger = 64
}

return leveldb.OpenFile(path, o)
}

// NewDB creates new instance of DB, but doesn't open it (yet)
func NewDB(path string) (database.Storage, error) {
return &storage{path: path}, nil
}

// NewOpenDB creates new instance of DB and opens it
func NewOpenDB(path string) (database.Storage, error) {
db, err := NewDB(path)
if err != nil {
return nil, err
}

return db, db.Open()
}

// RecoverDB recovers LevelDB database from corruption
func RecoverDB(path string) error {
stor, err := leveldbstorage.OpenFile(path, false)
if err != nil {
return err
}

db, err := leveldb.Recover(stor, nil)
if err != nil {
return err
}

db.Close()
stor.Close()

return nil
}
109 changes: 94 additions & 15 deletions database/leveldb_test.go → database/goleveldb/database_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package database
package goleveldb_test

import (
"testing"

. "gopkg.in/check.v1"

"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
)

// Launch gocheck tests
Expand All @@ -13,7 +16,7 @@ func Test(t *testing.T) {

type LevelDBSuite struct {
path string
db Storage
db database.Storage
}

var _ = Suite(&LevelDBSuite{})
Expand All @@ -22,7 +25,7 @@ func (s *LevelDBSuite) SetUpTest(c *C) {
var err error

s.path = c.MkDir()
s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Assert(err, IsNil)
}

Expand All @@ -43,10 +46,10 @@ func (s *LevelDBSuite) TestRecoverDB(c *C) {
err = s.db.Close()
c.Check(err, IsNil)

err = RecoverDB(s.path)
err = goleveldb.RecoverDB(s.path)
c.Check(err, IsNil)

s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Check(err, IsNil)

result, err := s.db.Get(key)
Expand Down Expand Up @@ -143,11 +146,11 @@ func (s *LevelDBSuite) TestByPrefix(c *C) {
c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})

c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
return ErrNotFound
}), Equals, ErrNotFound)
return database.ErrNotFound
}), Equals, database.ErrNotFound)

c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
return ErrNotFound
return database.ErrNotFound
}), IsNil)

c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
Expand Down Expand Up @@ -176,9 +179,9 @@ func (s *LevelDBSuite) TestBatch(c *C) {
err := s.db.Put(key, value)
c.Assert(err, IsNil)

s.db.StartBatch()
s.db.Put(key2, value2)
s.db.Delete(key)
batch := s.db.CreateBatch()
batch.Put(key2, value2)
batch.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
Expand All @@ -187,7 +190,7 @@ func (s *LevelDBSuite) TestBatch(c *C) {
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

err = s.db.FinishBatch()
err = batch.Write()
c.Check(err, IsNil)

v2, err := s.db.Get(key2)
Expand All @@ -196,11 +199,87 @@ func (s *LevelDBSuite) TestBatch(c *C) {

_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestTransactionCommit(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)

err := s.db.Put(key, value)
c.Assert(err, IsNil)

transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")

err = transaction.Commit()
c.Check(err, IsNil)

v2, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestTransactionDiscard(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)

err := s.db.Put(key, value)
c.Assert(err, IsNil)

transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")

transaction.Discard()

c.Check(func() { s.db.FinishBatch() }, Panics, "no batch")
v, err = s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

s.db.StartBatch()
c.Check(func() { s.db.StartBatch() }, Panics, "batch already started")
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestCompactDB(c *C) {
Expand Down
2 changes: 2 additions & 0 deletions database/goleveldb/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package goleveldb implements database interface via goleveldb
package goleveldb
Loading