Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently use transactions to update database #866

Merged
merged 1 commit into from
Aug 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) {
}

processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)

processedFiles = append(processedFiles, otherFiles...)
Expand Down Expand Up @@ -420,7 +420,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse)
context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions aptly/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"

"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/utils"
)

Expand Down Expand Up @@ -134,6 +135,9 @@ type Downloader interface {
GetLength(ctx context.Context, url string) (int64, error)
}

// ChecksumStorageProvider creates ChecksumStorage based on DB
type ChecksumStorageProvider func(db database.ReaderWriter) ChecksumStorage

// ChecksumStorage is stores checksums in some (persistent) storage
type ChecksumStorage interface {
// Get finds checksums in DB by path
Expand Down
4 changes: 2 additions & 2 deletions cmd/mirror_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {

context.Progress().Printf("Building download queue...\n")
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(),
context.CollectionFactory().ChecksumCollection(), skipExistingPackages)
context.CollectionFactory().ChecksumCollection(nil), skipExistingPackages)

if err != nil {
return fmt.Errorf("unable to update: %s", err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}

// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection(nil))
if err != nil {
return fmt.Errorf("unable to import file: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/repo_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func aptlyRepoAdd(cmd *commander.Command, args []string) error {

processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil,
context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return fmt.Errorf("unable to import package files: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/repo_include.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString,
context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection,
uploaders, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

Expand Down
6 changes: 6 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type Writer interface {
Delete(key []byte) error
}

// ReaderWriter combines Reader and Writer
type ReaderWriter interface {
Reader
Writer
}

// Storage is an interface to KV storage
type Storage interface {
Reader
Expand Down
4 changes: 2 additions & 2 deletions deb/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha
// ImportChangesFiles imports referenced files in changes files into local repository
func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool,
verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection,
pool aptly.PackagePool, checksumStorage aptly.ChecksumStorage, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {

var repoTemplate *template.Template
repoTemplate, err = template.New("repo").Parse(repoTemplateString)
Expand Down Expand Up @@ -384,7 +384,7 @@ func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, ac
var processedFiles2, failedFiles2 []string

processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool,
packageCollection, reporter, restriction, checksumStorage)
packageCollection, reporter, restriction, checksumStorageProvider)

if err != nil {
return nil, nil, fmt.Errorf("unable to import package files: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion deb/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) {
processedFiles, failedFiles, err := ImportChangesFiles(
append(changesFiles, "testdata/changes/notexistent.changes"),
s.Reporter, true, true, false, false, &NullVerifier{},
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, s.checksumStorage,
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage },
nil, nil)
c.Assert(err, IsNil)
c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes"))
Expand Down
4 changes: 2 additions & 2 deletions deb/checksum_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// ChecksumCollection does management of ChecksumInfo in DB
type ChecksumCollection struct {
db database.Storage
db database.ReaderWriter
codecHandle *codec.MsgpackHandle
}

// NewChecksumCollection creates new ChecksumCollection and binds it to database
func NewChecksumCollection(db database.Storage) *ChecksumCollection {
func NewChecksumCollection(db database.ReaderWriter) *ChecksumCollection {
return &ChecksumCollection{
db: db,
codecHandle: &codec.MsgpackHandle{},
Expand Down
7 changes: 6 additions & 1 deletion deb/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deb
import (
"sync"

"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
)

Expand Down Expand Up @@ -91,10 +92,14 @@ func (factory *CollectionFactory) PublishedRepoCollection() *PublishedRepoCollec
}

// ChecksumCollection returns (or creates) new ChecksumCollection
func (factory *CollectionFactory) ChecksumCollection() *ChecksumCollection {
func (factory *CollectionFactory) ChecksumCollection(db database.ReaderWriter) aptly.ChecksumStorage {
factory.Lock()
defer factory.Unlock()

if db != nil {
return NewChecksumCollection(db)
}

if factory.checksums == nil {
factory.checksums = NewChecksumCollection(factory.db)
}
Expand Down
14 changes: 11 additions & 3 deletions deb/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ func CollectPackageFiles(locations []string, reporter aptly.ResultReporter) (pac
// ImportPackageFiles imports files into local repository
func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier,
pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery,
checksumStorage aptly.ChecksumStorage) (processedFiles []string, failedFiles []string, err error) {
checksumStorageProvider aptly.ChecksumStorageProvider) (processedFiles []string, failedFiles []string, err error) {
if forceReplace {
list.PrepareIndex()
}

transaction, err := collection.db.OpenTransaction()
if err != nil {
return nil, nil, err
}
defer transaction.Discard()

checksumStorage := checksumStorageProvider(transaction)

for _, file := range packageFiles {
var (
stanza Stanza
Expand Down Expand Up @@ -193,7 +201,7 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
continue
}

err = collection.Update(p)
err = collection.UpdateInTransaction(p, transaction)
if err != nil {
reporter.Warning("Unable to save package %s: %s", p, err)
failedFiles = append(failedFiles, file)
Expand All @@ -219,6 +227,6 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
processedFiles = append(processedFiles, candidateProcessedFiles...)
}

err = nil
err = transaction.Commit()
return
}
34 changes: 26 additions & 8 deletions deb/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,23 @@ func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {

// Update stores updated information about repo in DB
func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
err := collection.db.Put(repo.Key(), repo.Encode())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}
if repo.packageRefs != nil {
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
if err != nil {
return err
}
}
return nil
return transaction.Commit()
}

// LoadComplete loads additional information for local repo
Expand Down Expand Up @@ -245,16 +251,28 @@ func (collection *LocalRepoCollection) Len() int {

// Drop removes remote repo from collection
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
panic("local repo not found!")
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

delete(collection.cache, repo.UUID)

err := collection.db.Delete(repo.Key())
if err != nil {
if _, err = transaction.Get(repo.Key()); err != nil {
if err == database.ErrNotFound {
return errors.New("local repo not found")
}
return err
}

if err = transaction.Delete(repo.Key()); err != nil {
return err
}

if err = transaction.Delete(repo.RefKey()); err != nil {
return err
}

return collection.db.Delete(repo.RefKey())
return transaction.Commit()
}
2 changes: 1 addition & 1 deletion deb/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,5 @@ func (s *LocalRepoCollectionSuite) TestDrop(c *C) {
r2, _ := collection.ByName("local2")
c.Check(r2.String(), Equals, repo2.String())

c.Check(func() { s.collection.Drop(repo1) }, Panics, "local repo not found!")
c.Check(s.collection.Drop(repo1), ErrorMatches, "local repo not found")
}
28 changes: 21 additions & 7 deletions deb/package_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,35 @@ func (collection *PackageCollection) loadContents(p *Package, packagePool aptly.
return contents
}

// Update adds or updates information about package in DB checking for conficts first
// Update adds or updates information about package in DB
func (collection *PackageCollection) Update(p *Package) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

if err = collection.UpdateInTransaction(p, transaction); err != nil {
return err
}

return transaction.Commit()
}

// UpdateInTransaction updates/creates package info in the context of the outer transaction
func (collection *PackageCollection) UpdateInTransaction(p *Package, transaction database.Transaction) error {
var encodeBuffer bytes.Buffer

encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle)

encodeBuffer.Reset()
encodeBuffer.WriteByte(0xc1)
encodeBuffer.WriteByte(0x1)
err := encoder.Encode(p)
if err != nil {
if err := encoder.Encode(p); err != nil {
return err
}

err = collection.db.Put(p.Key(""), encodeBuffer.Bytes())
err := transaction.Put(p.Key(""), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -228,7 +242,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xF"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xF"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -241,7 +255,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xD"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xD"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -256,7 +270,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xE"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xE"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand Down
31 changes: 22 additions & 9 deletions deb/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,21 +914,27 @@ func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) *
}

// Update stores updated information about repo in DB
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) (err error) {
err = collection.db.Put(repo.Key(), repo.Encode())
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return
return err
}
defer transaction.Discard()

err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}

if repo.SourceKind == SourceLocalRepo {
for component, item := range repo.sourceItems {
err = collection.db.Put(repo.RefKey(component), item.packageRefs.Encode())
err = transaction.Put(repo.RefKey(component), item.packageRefs.Encode())
if err != nil {
return
return err
}
}
}
return
return transaction.Commit()
}

// LoadComplete loads additional information for remote repo
Expand Down Expand Up @@ -1170,6 +1176,13 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
force, skipCleanup bool) error {

transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

// TODO: load via transaction
collection.loadList()

repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
Expand Down Expand Up @@ -1221,17 +1234,17 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
}
}

err = collection.db.Delete(repo.Key())
err = transaction.Delete(repo.Key())
if err != nil {
return err
}

for _, component := range repo.Components() {
err = collection.db.Delete(repo.RefKey(component))
err = transaction.Delete(repo.RefKey(component))
if err != nil {
return err
}
}

return nil
return transaction.Commit()
}
Loading