Skip to content

Commit

Permalink
Merge pull request #586 from mtrmac/mpg-fixes
Browse files Browse the repository at this point in the history
Fix progress bar cleanup, and other small cleanups
  • Loading branch information
rhatdan committed Feb 20, 2019
2 parents 22beff9 + 6ac9e0a commit f30d2c4
Showing 1 changed file with 56 additions and 41 deletions.
97 changes: 56 additions & 41 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/klauspost/pgzip"
"github.com/opencontainers/go-digest"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vbauerster/mpb"
Expand Down Expand Up @@ -91,8 +91,6 @@ type copier struct {
reportWriter io.Writer
progressOutput io.Writer
progressInterval time.Duration
progressPool *mpb.Progress
progressWG *sync.WaitGroup
progress chan types.ProgressProperties
blobInfoCache types.BlobInfoCache
copyInParallel bool
Expand Down Expand Up @@ -168,15 +166,12 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
progressOutput = ioutil.Discard
}
copyInParallel := dest.HasThreadSafePutBlob() && rawSource.HasThreadSafeGetBlob()
wg := new(sync.WaitGroup)
c := &copier{
dest: dest,
rawSource: rawSource,
reportWriter: reportWriter,
progressOutput: progressOutput,
progressInterval: options.ProgressInterval,
progressPool: mpb.New(mpb.WithWidth(40), mpb.WithOutput(progressOutput), mpb.WithWaitGroup(wg)),
progressWG: wg,
progress: options.Progress,
copyInParallel: copyInParallel,
// FIXME? The cache is used for sources and destinations equally, but we only have a SourceCtx and DestinationCtx.
Expand Down Expand Up @@ -428,11 +423,6 @@ func (ic *imageCopier) updateEmbeddedDockerReference() error {
return nil
}

// shortDigest returns the first 12 characters of the digest.
func shortDigest(d digest.Digest) string {
return d.Encoded()[:12]
}

// isTTY returns true if the io.Writer is a file and a tty.
func isTTY(w io.Writer) bool {
if f, ok := w.(*os.File); ok {
Expand Down Expand Up @@ -496,27 +486,29 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, bar)
}
data[index] = cld
if bar != nil {
bar.SetTotal(srcLayer.Size, true)
}
bar.SetTotal(srcLayer.Size, true)
}

progressBars := make([]*mpb.Bar, numLayers)
for i, srcInfo := range srcInfos {
progressBars[i] = ic.c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest), "blob")
}
func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()

for i, srcLayer := range srcInfos {
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressBars[i])
}
progressBars := make([]*mpb.Bar, numLayers)
for i, srcInfo := range srcInfos {
progressBars[i] = ic.c.createProgressBar(progressPool, srcInfo, "blob")
}

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, srcLayer := range srcInfos {
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressBars[i])
}

// Wait for all layers to be copied
copyGroup.Wait()
// Wait for all layers to be copied
copyGroup.Wait()
}()

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
if cld.err != nil {
return cld.err
Expand Down Expand Up @@ -580,20 +572,40 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
return nil, err
}

ic.c.progressPool.Wait()
ic.c.Printf("Writing manifest to image destination\n")
if err := ic.c.dest.PutManifest(ctx, manifest); err != nil {
return nil, errors.Wrap(err, "Error writing manifest")
}
return manifest, nil
}

// createProgressBar creates a mpb.Bar. Note that if the copier's reportWriter
// newProgressPool creates a *mpb.Progress and a cleanup function.
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
return pool, func() {
cancel()
pool.Wait()
}
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(info types.BlobInfo, name, kind string) *mpb.Bar {
bar := c.progressPool.AddBar(info.Size,
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded())
// Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column.
maxPrefixLen := len("Copying blob ") + shortDigestLen
if len(prefix) > maxPrefixLen {
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.PrependDecorators(
decor.Name(fmt.Sprintf("Copying %s %s", kind, name)),
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.CountersKibiByte("%.1f / %.1f"),
Expand All @@ -614,14 +626,19 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
return errors.Wrapf(err, "Error reading config blob %s", srcInfo.Digest)
}

// make the short digest only 10 characters long to make it align with the blob output
bar := c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest)[0:10], "config")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
}
bar.SetTotal(int64(len(configBlob)), true)
return destInfo, nil
}()
if err != nil {
return err
}
if bar != nil {
bar.SetTotal(0, true)
return nil
}
if destInfo.Digest != srcInfo.Digest {
return errors.Errorf("Internal error: copying uncompressed config blob %s changed digest to %s", srcInfo.Digest, destInfo.Digest)
Expand Down Expand Up @@ -650,7 +667,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", shortDigest(srcInfo.Digest))
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
return blobInfo, cachedDiffID, nil
}
}
Expand Down Expand Up @@ -752,8 +769,6 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
c.progressWG.Add(1)
defer c.progressWG.Done()
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream

Expand Down

0 comments on commit f30d2c4

Please sign in to comment.