Skip to content

Commit

Permalink
Reimplement DB collections for mirrors, repos and snapshots
Browse files Browse the repository at this point in the history
See #765, #761

Collections were relying on keeping in-memory list of all the objects
for any kind of operation which doesn't scale well the number of
objects in the database.

With this rewrite, objects are loaded only on demand which might
be pessimization in some edge cases but should improve performance
and memory footprint signifcantly.
  • Loading branch information
smira committed Aug 20, 2018
1 parent fb5985b commit 20a333f
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 204 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN_LONG_TESTS?=yes

GO_1_10_AND_HIGHER=$(shell (printf '%s\n' go1.10 $(GOVERSION) | sort -cV >/dev/null 2>&1) && echo "yes")

all: test check system-test
all: test bench check system-test

prepare:
go get -u github.com/alecthomas/gometalinter
Expand Down Expand Up @@ -57,6 +57,13 @@ else
go test -v `go list ./... | grep -v vendor/` -gocheck.v=true
endif

bench:
ifeq ($(GO_1_10_AND_HIGHER), yes)
go test -v ./... -run=nothing -bench=. -benchmem
else
go test -v `go list ./... | grep -v vendor/` -run=nothing -bench=. -benchmem
endif

mem.png: mem.dat mem.gp
gnuplot mem.gp
open mem.png
Expand Down
116 changes: 65 additions & 51 deletions deb/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deb

import (
"bytes"
"errors"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -93,52 +94,68 @@ func (repo *LocalRepo) RefKey() []byte {
// LocalRepoCollection does listing, updating/adding/deleting of LocalRepos
type LocalRepoCollection struct {
*sync.RWMutex
db database.Storage
list []*LocalRepo
db database.Storage
cache map[string]*LocalRepo
}

// NewLocalRepoCollection loads LocalRepos from DB and makes up collection
func NewLocalRepoCollection(db database.Storage) *LocalRepoCollection {
return &LocalRepoCollection{
RWMutex: &sync.RWMutex{},
db: db,
cache: make(map[string]*LocalRepo),
}
}

func (collection *LocalRepoCollection) loadList() {
if collection.list != nil {
return
func (collection *LocalRepoCollection) search(filter func(*LocalRepo) bool, unique bool) []*LocalRepo {
result := []*LocalRepo(nil)
for _, r := range collection.cache {
if filter(r) {
result = append(result, r)
}
}

blobs := collection.db.FetchByPrefix([]byte("L"))
collection.list = make([]*LocalRepo, 0, len(blobs))
if unique && len(result) > 0 {
return result
}

for _, blob := range blobs {
collection.db.ProcessByPrefix([]byte("L"), func(key, blob []byte) error {
r := &LocalRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding repo: %s\n", err)
} else {
collection.list = append(collection.list, r)
log.Printf("Error decoding local repo: %s\n", err)
return nil
}
}

if filter(r) {
if _, exists := collection.cache[r.UUID]; !exists {
collection.cache[r.UUID] = r
result = append(result, r)
if unique {
return errors.New("abort")
}
}
}

return nil
})

return result
}

// Add appends new repo to collection and saves it
func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {
collection.loadList()
_, err := collection.ByName(repo.Name)

for _, r := range collection.list {
if r.Name == repo.Name {
return fmt.Errorf("local repo with name %s already exists", repo.Name)
}
if err == nil {
return fmt.Errorf("local repo with name %s already exists", repo.Name)
}

err := collection.Update(repo)
err = collection.Update(repo)
if err != nil {
return err
}

collection.list = append(collection.list, repo)
collection.cache[repo.UUID] = repo
return nil
}

Expand All @@ -159,8 +176,6 @@ func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {

// LoadComplete loads additional information for local repo
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
collection.loadList()

encoded, err := collection.db.Get(repo.RefKey())
if err == database.ErrNotFound {
return nil
Expand All @@ -175,26 +190,39 @@ func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {

// ByName looks up repository by name
func (collection *LocalRepoCollection) ByName(name string) (*LocalRepo, error) {
collection.loadList()

for _, r := range collection.list {
if r.Name == name {
return r, nil
}
result := collection.search(func(r *LocalRepo) bool { return r.Name == name }, true)
if len(result) == 0 {
return nil, fmt.Errorf("local repo with name %s not found", name)
}
return nil, fmt.Errorf("local repo with name %s not found", name)

return result[0], nil
}

// ByUUID looks up repository by uuid
func (collection *LocalRepoCollection) ByUUID(uuid string) (*LocalRepo, error) {
collection.loadList()
if r, ok := collection.cache[uuid]; ok {
return r, nil
}

for _, r := range collection.list {
if r.UUID == uuid {
return r, nil
}
key := (&LocalRepo{UUID: uuid}).Key()

value, err := collection.db.Get(key)
if err == database.ErrNotFound {
return nil, fmt.Errorf("local repo with uuid %s not found", uuid)
}

if err != nil {
return nil, err
}

r := &LocalRepo{}
err = r.Decode(value)

if err == nil {
collection.cache[r.UUID] = r
}
return nil, fmt.Errorf("local repo with uuid %s not found", uuid)

return r, err
}

// ForEach runs method for each repository
Expand All @@ -212,30 +240,16 @@ func (collection *LocalRepoCollection) ForEach(handler func(*LocalRepo) error) e

// Len returns number of remote repos
func (collection *LocalRepoCollection) Len() int {
collection.loadList()

return len(collection.list)
return len(collection.db.KeysByPrefix([]byte("L")))
}

// Drop removes remote repo from collection
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
collection.loadList()

repoPosition := -1

for i, r := range collection.list {
if r == repo {
repoPosition = i
break
}
}

if repoPosition == -1 {
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
panic("local repo not found!")
}

collection.list[len(collection.list)-1], collection.list[repoPosition], collection.list =
nil, collection.list[len(collection.list)-1], collection.list[:len(collection.list)-1]
delete(collection.cache, repo.UUID)

err := collection.db.Delete(repo.Key())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions deb/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (s *LocalRepoCollectionSuite) TestByUUID(c *C) {

r, err := s.collection.ByUUID(repo.UUID)
c.Assert(err, IsNil)
c.Assert(r, Equals, repo)

collection := NewLocalRepoCollection(s.db)
r, err = collection.ByUUID(repo.UUID)
c.Assert(err, IsNil)
c.Assert(r.String(), Equals, repo.String())
}

Expand Down
Loading

0 comments on commit 20a333f

Please sign in to comment.