Skip to content

Commit

Permalink
Rework HTTP downloader retry logic
Browse files Browse the repository at this point in the history
Apply retries as global, config-level option `downloadRetries` so that
it can be applied to any aptly command which downloads objects.

Unwrap `errors.Wrap` which is used in downloader.

Unwrap `*url.Error` which should be the actual error returned from the
HTTP client, catch more cases, be more specific around failures.
  • Loading branch information
smira committed Aug 7, 2019
1 parent 2e7f624 commit 5e56ac6
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 63 deletions.
2 changes: 1 addition & 1 deletion aptly/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Downloader interface {
// Download starts new download task
Download(ctx context.Context, url string, destination string) error
// DownloadWithChecksum starts new download task with checksum verification
DownloadWithChecksum(ctx context.Context, url string, destination string, expected *utils.ChecksumInfo, ignoreMismatch bool, maxTries int) error
DownloadWithChecksum(ctx context.Context, url string, destination string, expected *utils.ChecksumInfo, ignoreMismatch bool) error
// GetProgress returns Progress object
GetProgress() Progress
// GetLength returns size by heading object with url
Expand Down
6 changes: 2 additions & 4 deletions cmd/mirror_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}

ignoreMismatch := context.Flags().Lookup("ignore-checksums").Value.Get().(bool)
maxTries := context.Flags().Lookup("max-tries").Value.Get().(int)

verifier, err := getVerifier(context.Flags())
if err != nil {
Expand All @@ -54,7 +53,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}

context.Progress().Printf("Downloading & parsing package files...\n")
err = repo.DownloadPackageIndexes(context.Progress(), context.Downloader(), verifier, context.CollectionFactory(), ignoreMismatch, maxTries)
err = repo.DownloadPackageIndexes(context.Progress(), context.Downloader(), verifier, context.CollectionFactory(), ignoreMismatch)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
Expand Down Expand Up @@ -173,8 +172,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
repo.PackageURL(task.File.DownloadURL()).String(),
task.TempDownPath,
&task.File.Checksums,
ignoreMismatch,
maxTries)
ignoreMismatch)
if e != nil {
pushError(e)
continue
Expand Down
10 changes: 9 additions & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,15 @@ func (context *AptlyContext) Downloader() aptly.Downloader {
if downloadLimit == 0 {
downloadLimit = context.config().DownloadLimit
}
context.downloader = http.NewDownloader(downloadLimit*1024, context._progress())
maxTries := context.config().DownloadRetries + 1
maxTriesFlag := context.flags.Lookup("max-tries")
if maxTriesFlag != nil {
maxTriesFlagValue := maxTriesFlag.Value.Get().(int)
if maxTriesFlagValue > maxTries {
maxTries = maxTriesFlagValue
}
}
context.downloader = http.NewDownloader(downloadLimit*1024, maxTries, context._progress())
}

return context.downloader
Expand Down
4 changes: 2 additions & 2 deletions deb/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ ok:

// DownloadPackageIndexes downloads & parses package index files
func (repo *RemoteRepo) DownloadPackageIndexes(progress aptly.Progress, d aptly.Downloader, verifier pgp.Verifier, collectionFactory *CollectionFactory,
ignoreMismatch bool, maxTries int) error {
ignoreMismatch bool) error {
if repo.packageList != nil {
panic("packageList != nil")
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func (repo *RemoteRepo) DownloadPackageIndexes(progress aptly.Progress, d aptly.

for _, info := range packagesPaths {
path, kind, component, architecture := info[0], info[1], info[2], info[3]
packagesReader, packagesFile, err := http.DownloadTryCompression(gocontext.TODO(), d, repo.IndexesRootURL(), path, repo.ReleaseFiles, ignoreMismatch, maxTries)
packagesReader, packagesFile, err := http.DownloadTryCompression(gocontext.TODO(), d, repo.IndexesRootURL(), path, repo.ReleaseFiles, ignoreMismatch)

isInstaller := kind == PackageTypeInstaller
if err != nil {
Expand Down
26 changes: 13 additions & 13 deletions deb/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *RemoteRepoSuite) TestDownload(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages", examplePackagesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand All @@ -303,7 +303,7 @@ func (s *RemoteRepoSuite) TestDownload(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages", examplePackagesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand All @@ -324,7 +324,7 @@ func (s *RemoteRepoSuite) TestDownload(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages", examplePackagesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand All @@ -351,7 +351,7 @@ func (s *RemoteRepoSuite) TestDownloadWithInstaller(c *C) {
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/installer-i386/current/images/SHA256SUMS", exampleInstallerHashSumFile)
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/installer-i386/current/images/MANIFEST", exampleInstallerManifestFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand Down Expand Up @@ -395,7 +395,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources", exampleSourcesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand Down Expand Up @@ -439,7 +439,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources", exampleSourcesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand All @@ -464,7 +464,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources.gz", &http.Error{Code: 404})
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources", exampleSourcesFile)

err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false, 1)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, nil, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)

Expand All @@ -488,7 +488,7 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) {
err := s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand Down Expand Up @@ -516,7 +516,7 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) {
err = s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand All @@ -538,7 +538,7 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) {
err = s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand Down Expand Up @@ -569,7 +569,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) {
err := s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand Down Expand Up @@ -615,7 +615,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) {
err = s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand All @@ -641,7 +641,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) {
err = s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)

err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true, 1)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, nil, s.collectionFactory, true)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)

Expand Down
4 changes: 2 additions & 2 deletions http/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var compressionMethods = []struct {

// DownloadTryCompression tries to download from URL .bz2, .gz and raw extension until
// it finds existing file.
func DownloadTryCompression(ctx context.Context, downloader aptly.Downloader, baseURL *url.URL, path string, expectedChecksums map[string]utils.ChecksumInfo, ignoreMismatch bool, maxTries int) (io.Reader, *os.File, error) {
func DownloadTryCompression(ctx context.Context, downloader aptly.Downloader, baseURL *url.URL, path string, expectedChecksums map[string]utils.ChecksumInfo, ignoreMismatch bool) (io.Reader, *os.File, error) {
var err error

for _, method := range compressionMethods {
Expand All @@ -63,7 +63,7 @@ func DownloadTryCompression(ctx context.Context, downloader aptly.Downloader, ba

if foundChecksum {
expected := expectedChecksums[bestSuffix]
file, err = DownloadTempWithChecksum(ctx, downloader, tryURL.String(), &expected, ignoreMismatch, maxTries)
file, err = DownloadTempWithChecksum(ctx, downloader, tryURL.String(), &expected, ignoreMismatch)
} else {
if !ignoreMismatch {
continue
Expand Down
18 changes: 9 additions & 9 deletions http/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *CompressionSuite) TestDownloadTryCompression(c *C) {
buf = make([]byte, 4)
d := NewFakeDownloader()
d.ExpectResponse("http://example.com/file.bz2", bzipData)
r, file, err := DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false, 1)
r, file, err := DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false)
c.Assert(err, IsNil)
defer file.Close()
io.ReadFull(r, buf)
Expand All @@ -56,7 +56,7 @@ func (s *CompressionSuite) TestDownloadTryCompression(c *C) {
d = NewFakeDownloader()
d.ExpectError("http://example.com/file.bz2", &Error{Code: 404})
d.ExpectResponse("http://example.com/file.gz", gzipData)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false, 1)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false)
c.Assert(err, IsNil)
defer file.Close()
io.ReadFull(r, buf)
Expand All @@ -69,7 +69,7 @@ func (s *CompressionSuite) TestDownloadTryCompression(c *C) {
d.ExpectError("http://example.com/file.bz2", &Error{Code: 404})
d.ExpectError("http://example.com/file.gz", &Error{Code: 404})
d.ExpectResponse("http://example.com/file.xz", xzData)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false, 1)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false)
c.Assert(err, IsNil)
defer file.Close()
io.ReadFull(r, buf)
Expand All @@ -83,7 +83,7 @@ func (s *CompressionSuite) TestDownloadTryCompression(c *C) {
d.ExpectError("http://example.com/file.gz", &Error{Code: 404})
d.ExpectError("http://example.com/file.xz", &Error{Code: 404})
d.ExpectResponse("http://example.com/file", rawData)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false, 1)
r, file, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false)
c.Assert(err, IsNil)
defer file.Close()
io.ReadFull(r, buf)
Expand All @@ -94,7 +94,7 @@ func (s *CompressionSuite) TestDownloadTryCompression(c *C) {
d = NewFakeDownloader()
d.ExpectError("http://example.com/file.bz2", &Error{Code: 404})
d.ExpectResponse("http://example.com/file.gz", "x")
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true, 1)
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true)
c.Assert(err, ErrorMatches, "unexpected EOF")
c.Assert(d.Empty(), Equals, true)
}
Expand All @@ -112,7 +112,7 @@ func (s *CompressionSuite) TestDownloadTryCompressionLongestSuffix(c *C) {
buf = make([]byte, 4)
d := NewFakeDownloader()
d.ExpectResponse("http://example.com/subdir/file.bz2", bzipData)
r, file, err := DownloadTryCompression(s.ctx, d, s.baseURL, "subdir/file", expectedChecksums, false, 1)
r, file, err := DownloadTryCompression(s.ctx, d, s.baseURL, "subdir/file", expectedChecksums, false)
c.Assert(err, IsNil)
defer file.Close()
io.ReadFull(r, buf)
Expand All @@ -122,15 +122,15 @@ func (s *CompressionSuite) TestDownloadTryCompressionLongestSuffix(c *C) {

func (s *CompressionSuite) TestDownloadTryCompressionErrors(c *C) {
d := NewFakeDownloader()
_, _, err := DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true, 1)
_, _, err := DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true)
c.Assert(err, ErrorMatches, "unexpected request.*")

d = NewFakeDownloader()
d.ExpectError("http://example.com/file.bz2", &Error{Code: 404})
d.ExpectError("http://example.com/file.gz", &Error{Code: 404})
d.ExpectError("http://example.com/file.xz", &Error{Code: 404})
d.ExpectError("http://example.com/file", errors.New("403"))
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true, 1)
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", nil, true)
c.Assert(err, ErrorMatches, "403")

d = NewFakeDownloader()
Expand All @@ -144,6 +144,6 @@ func (s *CompressionSuite) TestDownloadTryCompressionErrors(c *C) {
"file.xz": {Size: 7},
"file": {Size: 7},
}
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false, 1)
_, _, err = DownloadTryCompression(s.ctx, d, s.baseURL, "file", expectedChecksums, false)
c.Assert(err, ErrorMatches, "checksums don't match.*")
}
40 changes: 36 additions & 4 deletions http/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -28,12 +29,13 @@ var (
type downloaderImpl struct {
progress aptly.Progress
aggWriter io.Writer
maxTries int
client *http.Client
}

// NewDownloader creates new instance of Downloader which specified number
// of threads and download limit in bytes/sec
func NewDownloader(downLimit int64, progress aptly.Progress) aptly.Downloader {
func NewDownloader(downLimit int64, maxTries int, progress aptly.Progress) aptly.Downloader {
transport := http.Transport{}
transport.Proxy = http.DefaultTransport.(*http.Transport).Proxy
transport.ResponseHeaderTimeout = 30 * time.Second
Expand All @@ -45,6 +47,7 @@ func NewDownloader(downLimit int64, progress aptly.Progress) aptly.Downloader {

downloader := &downloaderImpl{
progress: progress,
maxTries: maxTries,
client: &http.Client{
Transport: &transport,
},
Expand All @@ -71,7 +74,19 @@ func (downloader *downloaderImpl) GetLength(ctx context.Context, url string) (in
return -1, err
}

resp, err := downloader.client.Do(req)
var resp *http.Response

maxTries := downloader.maxTries
for maxTries > 0 {
resp, err = downloader.client.Do(req)
if err != nil && retryableError(err) {
maxTries--
} else {
// stop retrying
break
}
}

if err != nil {
return -1, errors.Wrap(err, url)
}
Expand All @@ -89,10 +104,25 @@ func (downloader *downloaderImpl) GetLength(ctx context.Context, url string) (in

// Download starts new download task
func (downloader *downloaderImpl) Download(ctx context.Context, url string, destination string) error {
return downloader.DownloadWithChecksum(ctx, url, destination, nil, false, 1)
return downloader.DownloadWithChecksum(ctx, url, destination, nil, false)
}

func retryableError(err error) bool {
// unwrap errors.Wrap
err = errors.Cause(err)

// unwrap *url.Error
if wrapped, ok := err.(*url.Error); ok {
err = wrapped.Err
}

switch err {
case io.EOF:
return true
case io.ErrUnexpectedEOF:
return true
}

switch err.(type) {
case *net.OpError:
return true
Expand All @@ -101,6 +131,7 @@ func retryableError(err error) bool {
case net.Error:
return true
}

return false
}

Expand All @@ -123,14 +154,15 @@ func (downloader *downloaderImpl) newRequest(ctx context.Context, method, url st

// DownloadWithChecksum starts new download task with checksum verification
func (downloader *downloaderImpl) DownloadWithChecksum(ctx context.Context, url string, destination string,
expected *utils.ChecksumInfo, ignoreMismatch bool, maxTries int) error {
expected *utils.ChecksumInfo, ignoreMismatch bool) error {

if downloader.progress != nil {
downloader.progress.Printf("Downloading %s...\n", url)
}
req, err := downloader.newRequest(ctx, "GET", url)

var temppath string
maxTries := downloader.maxTries
for maxTries > 0 {
temppath, err = downloader.download(req, url, destination, expected, ignoreMismatch)

Expand Down
Loading

0 comments on commit 5e56ac6

Please sign in to comment.