Skip to content

Commit

Permalink
Refactor database code to support standalone batches, transactions.
Browse files Browse the repository at this point in the history
This is spin-off of changes from #459.

Transactions are not being used yet, but batches are updated to work
with the new API.

`database/` package was refactored to split abstract interfaces and
implementation via goleveldb. This should make it easier to implement
new database types.
  • Loading branch information
smira committed Aug 1, 2019
1 parent 2e7f624 commit 34deb04
Show file tree
Hide file tree
Showing 27 changed files with 544 additions and 322 deletions.
6 changes: 3 additions & 3 deletions cmd/db_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ func aptlyDbCleanup(cmd *commander.Command, args []string) error {
}

if !dryRun {
db.StartBatch()
batch := db.CreateBatch()
err = toDelete.ForEach(func(ref []byte) error {
return context.CollectionFactory().PackageCollection().DeleteByKey(ref)
return context.CollectionFactory().PackageCollection().DeleteByKey(ref, batch)
})
if err != nil {
return err
}

err = db.FinishBatch()
err = batch.Write()
if err != nil {
return fmt.Errorf("unable to write to DB: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/db_recover.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cmd

import (
"github.com/aptly-dev/aptly/database"
"github.com/smira/commander"

"github.com/aptly-dev/aptly/database/goleveldb"
)

// aptly db recover
Expand All @@ -15,7 +16,7 @@ func aptlyDbRecover(cmd *commander.Command, args []string) error {
}

context.Progress().Printf("Recovering database...\n")
err = database.RecoverDB(context.DBPath())
err = goleveldb.RecoverDB(context.DBPath())

return err
}
Expand Down
3 changes: 2 additions & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/console"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/http"
Expand Down Expand Up @@ -244,7 +245,7 @@ func (context *AptlyContext) _database() (database.Storage, error) {
if context.database == nil {
var err error

context.database, err = database.NewDB(context.dbPath())
context.database, err = goleveldb.NewDB(context.dbPath())
if err != nil {
return nil, fmt.Errorf("can't instantiate database: %s", err)
}
Expand Down
69 changes: 69 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Package database provides KV database for meta-information
package database

import "errors"

// Errors for Storage
var (
ErrNotFound = errors.New("key not found")
)

// StorageProcessor is a function to process one single storage entry
type StorageProcessor func(key []byte, value []byte) error

// Reader provides KV read calls
type Reader interface {
Get(key []byte) ([]byte, error)
}

// PrefixReader provides prefixed operations
type PrefixReader interface {
HasPrefix(prefix []byte) bool
ProcessByPrefix(prefix []byte, proc StorageProcessor) error
KeysByPrefix(prefix []byte) [][]byte
FetchByPrefix(prefix []byte) [][]byte
}

// Writer provides KV update/delete calls
type Writer interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
}

// Storage is an interface to KV storage
type Storage interface {
Reader
Writer

PrefixReader

CreateBatch() Batch
OpenTransaction() (Transaction, error)

CreateTemporary() (Storage, error)

Open() error
Close() error
CompactDB() error
Drop() error
}

// Batch provides a way to pack many writes.
type Batch interface {
Writer

// Write closes batch and send accumulated writes to the database
Write() error
}

// Transaction provides isolated atomic way to perform updates.
//
// Transactions might be expensive.
// Transaction should always finish with either Discard() or Commit()
type Transaction interface {
Reader
Writer

Commit() error
Discard()
}
34 changes: 34 additions & 0 deletions database/goleveldb/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package goleveldb

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/aptly-dev/aptly/database"
)

type batch struct {
db *leveldb.DB
b *leveldb.Batch
}

func (b *batch) Put(key, value []byte) error {
b.b.Put(key, value)

return nil
}

func (b *batch) Delete(key []byte) error {
b.b.Delete(key)

return nil
}

func (b *batch) Write() error {
return b.db.Write(b.b, &opt.WriteOptions{})
}

// batch should implement database.Batch
var (
_ database.Batch = &batch{}
)
58 changes: 58 additions & 0 deletions database/goleveldb/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package goleveldb

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldbstorage "github.com/syndtr/goleveldb/leveldb/storage"

"github.com/aptly-dev/aptly/database"
)

func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) {
o := &opt.Options{
Filter: filter.NewBloomFilter(10),
OpenFilesCacheCapacity: 256,
}

if throttleCompaction {
o.CompactionL0Trigger = 32
o.WriteL0PauseTrigger = 96
o.WriteL0SlowdownTrigger = 64
}

return leveldb.OpenFile(path, o)
}

// NewDB creates new instance of DB, but doesn't open it (yet)
func NewDB(path string) (database.Storage, error) {
return &storage{path: path}, nil
}

// NewOpenDB creates new instance of DB and opens it
func NewOpenDB(path string) (database.Storage, error) {
db, err := NewDB(path)
if err != nil {
return nil, err
}

return db, db.Open()
}

// RecoverDB recovers LevelDB database from corruption
func RecoverDB(path string) error {
stor, err := leveldbstorage.OpenFile(path, false)
if err != nil {
return err
}

db, err := leveldb.Recover(stor, nil)
if err != nil {
return err
}

db.Close()
stor.Close()

return nil
}
109 changes: 94 additions & 15 deletions database/leveldb_test.go → database/goleveldb/database_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package database
package goleveldb_test

import (
"testing"

. "gopkg.in/check.v1"

"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
)

// Launch gocheck tests
Expand All @@ -13,7 +16,7 @@ func Test(t *testing.T) {

type LevelDBSuite struct {
path string
db Storage
db database.Storage
}

var _ = Suite(&LevelDBSuite{})
Expand All @@ -22,7 +25,7 @@ func (s *LevelDBSuite) SetUpTest(c *C) {
var err error

s.path = c.MkDir()
s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Assert(err, IsNil)
}

Expand All @@ -43,10 +46,10 @@ func (s *LevelDBSuite) TestRecoverDB(c *C) {
err = s.db.Close()
c.Check(err, IsNil)

err = RecoverDB(s.path)
err = goleveldb.RecoverDB(s.path)
c.Check(err, IsNil)

s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Check(err, IsNil)

result, err := s.db.Get(key)
Expand Down Expand Up @@ -143,11 +146,11 @@ func (s *LevelDBSuite) TestByPrefix(c *C) {
c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})

c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
return ErrNotFound
}), Equals, ErrNotFound)
return database.ErrNotFound
}), Equals, database.ErrNotFound)

c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
return ErrNotFound
return database.ErrNotFound
}), IsNil)

c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
Expand Down Expand Up @@ -176,9 +179,9 @@ func (s *LevelDBSuite) TestBatch(c *C) {
err := s.db.Put(key, value)
c.Assert(err, IsNil)

s.db.StartBatch()
s.db.Put(key2, value2)
s.db.Delete(key)
batch := s.db.CreateBatch()
batch.Put(key2, value2)
batch.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
Expand All @@ -187,7 +190,7 @@ func (s *LevelDBSuite) TestBatch(c *C) {
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

err = s.db.FinishBatch()
err = batch.Write()
c.Check(err, IsNil)

v2, err := s.db.Get(key2)
Expand All @@ -196,11 +199,87 @@ func (s *LevelDBSuite) TestBatch(c *C) {

_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestTransactionCommit(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)

err := s.db.Put(key, value)
c.Assert(err, IsNil)

transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")

err = transaction.Commit()
c.Check(err, IsNil)

v2, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestTransactionDiscard(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)

err := s.db.Put(key, value)
c.Assert(err, IsNil)

transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)

v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")

v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)

_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")

transaction.Discard()

c.Check(func() { s.db.FinishBatch() }, Panics, "no batch")
v, err = s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)

s.db.StartBatch()
c.Check(func() { s.db.StartBatch() }, Panics, "batch already started")
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
}

func (s *LevelDBSuite) TestCompactDB(c *C) {
Expand Down
2 changes: 2 additions & 0 deletions database/goleveldb/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package goleveldb implements database interface via goleveldb
package goleveldb
Loading

0 comments on commit 34deb04

Please sign in to comment.