From 3c979ed3c14df7c0e86ed7846dcd8f87fb97c863 Mon Sep 17 00:00:00 2001 From: b5 Date: Mon, 7 Dec 2020 15:54:31 -0500 Subject: [PATCH] feat(save): emit save events. Print progress bars on save add save event publication to dsfs, move all progress bar output up into cmd package and replace progress bar package with "mpb" with explicit support for displaying multiple progress bars at once. To pull this off lib.Instance gets a new .Bus() accessor method. This setup brings progress bar output closer to the way desktop / API driven events display progress. --- base/archive/archive_test.go | 5 +- base/dsfs/commit.go | 9 ++- base/dsfs/compute_fields.go | 25 +++++++- base/dsfs/dataset.go | 43 +++++++++++-- base/dsfs/dataset_test.go | 65 ++++++++++++++++---- base/dsfs/testdata_test.go | 3 +- base/dsfs/transform_test.go | 5 +- base/save.go | 2 +- base/save_test.go | 8 ++- cmd/print.go | 115 +++++++++++++++++++++++++++++++++++ cmd/qri.go | 8 +++ cmd/save.go | 9 --- event/dataset.go | 28 +++++++++ event/event.go | 25 +++++--- go.mod | 4 +- go.sum | 9 +++ lib/lib.go | 24 +++++--- lib/load_test.go | 2 + remote/client.go | 65 -------------------- remote/mock_client.go | 2 +- repo/test/test_repo.go | 2 +- 21 files changed, 337 insertions(+), 121 deletions(-) diff --git a/base/archive/archive_test.go b/base/archive/archive_test.go index bd7c5177a..92ffff22a 100644 --- a/base/archive/archive_test.go +++ b/base/archive/archive_test.go @@ -12,6 +12,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/dsfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestGenerateFilename(t *testing.T) { @@ -132,7 +133,7 @@ func testFS() (qfs.Filesystem, map[string]string, error) { ds.SetBodyFile(dataf) fs := qfs.NewMemFS() - dskey, err := dsfs.WriteDataset(ctx, nil, fs, ds, pk, dsfs.SaveSwitches{}) + dskey, err := dsfs.WriteDataset(ctx, nil, fs, event.NilBus, ds, pk, dsfs.SaveSwitches{}) if err != nil { return fs, ns, err } @@ -179,7 +180,7 @@ func testFSWithVizAndTransform() (qfs.Filesystem, map[string]string, error) { privKey := testPeers.GetTestPeerInfo(10).PrivKey var dsLk sync.Mutex - dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, ds, privKey, dsfs.SaveSwitches{Pin: true}) + dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, event.NilBus, ds, privKey, dsfs.SaveSwitches{Pin: true}) if err != nil { return st, ns, err } diff --git a/base/dsfs/commit.go b/base/dsfs/commit.go index feebd2741..8e34f3250 100644 --- a/base/dsfs/commit.go +++ b/base/dsfs/commit.go @@ -16,6 +16,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/friendly" "github.com/qri-io/qri/base/toqtype" + "github.com/qri-io/qri/event" ) // Timestamp is an function for getting commit timestamps @@ -62,13 +63,19 @@ func loadCommit(ctx context.Context, fs qfs.Filesystem, path string) (st *datase return dataset.UnmarshalCommit(data) } -func commitFileAddFunc(privKey crypto.PrivKey) addWriteFileFunc { +func commitFileAddFunc(privKey crypto.PrivKey, pub event.Publisher) addWriteFileFunc { return func(ds *dataset.Dataset, wfs *writeFiles) error { if ds.Commit == nil { return nil } hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) { + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{ + Username: ds.Peername, + Name: ds.Name, + Message: "finalizing", + Completion: 0.9, + }) if cff, ok := wfs.body.(*computeFieldsFile); ok { updateScriptPaths(ds, added) diff --git a/base/dsfs/compute_fields.go b/base/dsfs/compute_fields.go index 46945ef42..cc3ef9cae 100644 --- a/base/dsfs/compute_fields.go +++ b/base/dsfs/compute_fields.go @@ -14,6 +14,7 @@ import ( "github.com/qri-io/dataset/dsstats" "github.com/qri-io/jsonschema" "github.com/qri-io/qfs" + "github.com/qri-io/qri/event" ) type computeFieldsFile struct { @@ -48,7 +49,15 @@ var ( _ statsComponentFile = (*computeFieldsFile)(nil) ) -func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesystem, pk crypto.PrivKey, ds, prev *dataset.Dataset, sw SaveSwitches) (qfs.File, error) { +func newComputeFieldsFile( + ctx context.Context, + dsLk *sync.Mutex, + fs qfs.Filesystem, + pub event.Publisher, + pk crypto.PrivKey, + ds *dataset.Dataset, + prev *dataset.Dataset, + sw SaveSwitches) (qfs.File, error) { var ( bf = ds.BodyFile() bfPrev qfs.File @@ -84,7 +93,7 @@ func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesyst done: make(chan error), } - go cff.handleRows(ctx) + go cff.handleRows(ctx, pub) return cff, nil } @@ -151,7 +160,7 @@ func (cff *computeFieldsFile) StatsComponent() (*dataset.Stats, error) { }, nil } -func (cff *computeFieldsFile) handleRows(ctx context.Context) { +func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publisher) { var ( batchBuf *dsio.EntryBuffer st = cff.ds.Structure @@ -201,6 +210,16 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context) { return } + // publish here so we know that if the user sees the "processing body file" + // message, we know that a compute-fields-file has made it all the way through + // setup + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{ + Username: cff.ds.Peername, + Name: cff.ds.Name, + Message: "processing body file", + Completion: 0.1, + }) + go func() { err = dsio.EachEntry(r, func(i int, ent dsio.Entry, err error) error { if err != nil { diff --git a/base/dsfs/dataset.go b/base/dsfs/dataset.go index 4b388932f..eaa914572 100644 --- a/base/dsfs/dataset.go +++ b/base/dsfs/dataset.go @@ -11,6 +11,7 @@ import ( "github.com/qri-io/dataset" "github.com/qri-io/dataset/validate" "github.com/qri-io/qfs" + "github.com/qri-io/qri/event" ) // number of entries to per batch when processing body data in WriteDataset @@ -142,6 +143,7 @@ func CreateDataset( ctx context.Context, source qfs.Filesystem, destination qfs.Filesystem, + pub event.Publisher, ds *dataset.Dataset, prev *dataset.Dataset, pk crypto.PrivKey, @@ -192,17 +194,34 @@ func CreateDataset( } // lock for editing dataset pointer - var dsLk = &sync.Mutex{} + var ( + dsLk = &sync.Mutex{} + peername = ds.Peername + name = ds.Name + ) - bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pk, ds, prev, sw) + bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pub, pk, ds, prev, sw) if err != nil { return "", err } ds.SetBodyFile(bodyFile) - path, err := WriteDataset(ctx, dsLk, destination, ds, pk, sw) + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveStarted, event.DsSaveEvent{ + Username: peername, + Name: name, + Message: "save started", + Completion: 0, + }) + + path, err := WriteDataset(ctx, dsLk, destination, pub, ds, pk, sw) if err != nil { log.Debug(err.Error()) + event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Error: err, + Completion: 1.0, + }) return "", err } @@ -211,9 +230,22 @@ func CreateDataset( // the caller doesn't use the ds arg afterward // might make sense to have a wrapper function that writes and loads on success if err := DerefDataset(ctx, destination, ds); err != nil { + event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Error: err, + Completion: 1.0, + }) return path, err } - return path, nil + + return path, pub.Publish(ctx, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Message: "dataset saved", + Path: path, + Completion: 1.0, + }) } // WriteDataset persists a datasets to a destination filesystem @@ -221,6 +253,7 @@ func WriteDataset( ctx context.Context, dsLk *sync.Mutex, destination qfs.Filesystem, + pub event.Publisher, ds *dataset.Dataset, pk crypto.PrivKey, sw SaveSwitches, @@ -240,7 +273,7 @@ func WriteDataset( addStatsFile, addReadmeFile, vizFilesAddFunc(destination, sw), - commitFileAddFunc(pk), + commitFileAddFunc(pk, pub), addDatasetFile, } diff --git a/base/dsfs/dataset_test.go b/base/dsfs/dataset_test.go index 19c073a2f..d966273f7 100644 --- a/base/dsfs/dataset_test.go +++ b/base/dsfs/dataset_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/qri-io/dataset" "github.com/qri-io/dataset/dsio" "github.com/qri-io/dataset/dstest" @@ -20,6 +21,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/toqtype" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestLoadDataset(t *testing.T) { @@ -47,7 +49,7 @@ func TestLoadDataset(t *testing.T) { info := testPeers.GetTestPeerInfo(10) pk := info.PrivKey - apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{}) + apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{}) if err != nil { t.Errorf(err.Error()) return @@ -156,7 +158,7 @@ func TestCreateDataset(t *testing.T) { t.Fatalf("creating test case: %s", err) } - _, err = CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) + _, err = CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) if err == nil { t.Fatalf("CreateDataset expected error. got nil") } @@ -189,7 +191,7 @@ func TestCreateDataset(t *testing.T) { t.Fatalf("creating test case: %s", err) } - path, err := CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) if err != nil { t.Fatalf("CreateDataset: %s", err) } @@ -218,7 +220,7 @@ func TestCreateDataset(t *testing.T) { } t.Run("no_priv_key", func(t *testing.T) { - _, err := CreateDataset(ctx, fs, fs, nil, nil, nil, SaveSwitches{ShouldRender: true}) + _, err := CreateDataset(ctx, fs, fs, event.NilBus, nil, nil, nil, SaveSwitches{ShouldRender: true}) if err == nil { t.Fatal("expected call without prvate key to error") } @@ -242,7 +244,7 @@ func TestCreateDataset(t *testing.T) { t.Errorf("case nil body and previous body files, error reading data file: %s", err.Error()) } expectedErr := "bodyfile or previous bodyfile needed" - _, err = CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{ShouldRender: true}) + _, err = CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{ShouldRender: true}) if err.Error() != expectedErr { t.Errorf("case nil body and previous body files, error mismatch: expected '%s', got '%s'", expectedErr, err.Error()) } @@ -272,7 +274,7 @@ func TestCreateDataset(t *testing.T) { } ds.SetBodyFile(qfs.NewMemfileBytes("body.csv", bodyBytes)) - path, err := CreateDataset(ctx, fs, fs, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true}) if err != nil && err.Error() != expectedErr { t.Fatalf("mismatch: expected %q, got %q", expectedErr, err.Error()) } else if err == nil { @@ -312,7 +314,7 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) { } ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`))) - path, err := CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{}) if err != nil { t.Fatal(err) } @@ -324,6 +326,47 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) { } } +func TestDatasetSaveEvents(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fs := qfs.NewMemFS() + privKey := testPeers.GetTestPeerInfo(10).PrivKey + bus := event.NewBus(ctx) + + fired := map[event.Type]int{} + bus.Subscribe(func(ctx context.Context, t event.Type, payload interface{}) error { + fired[t]++ + return nil + }, + event.ETDatasetSaveStarted, + event.ETDatasetSaveProgress, + event.ETDatasetSaveCompleted, + ) + + ds := &dataset.Dataset{ + Commit: &dataset.Commit{ + Timestamp: time.Date(2100, 1, 2, 3, 4, 5, 6, time.Local), + }, + Structure: &dataset.Structure{Format: "json", Schema: dataset.BaseSchemaArray}, + } + ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`))) + + if _, err := CreateDataset(ctx, fs, fs, bus, ds, nil, privKey, SaveSwitches{}); err != nil { + t.Fatal(err) + } + + expect := map[event.Type]int{ + event.ETDatasetSaveStarted: 1, + event.ETDatasetSaveProgress: 2, + event.ETDatasetSaveCompleted: 1, + } + + if diff := cmp.Diff(expect, fired); diff != "" { + t.Errorf("fired event count mismatch. (-want +got):%s\n", diff) + } +} + // BaseTabularSchema is the base schema for tabular data // NOTE: Do not use if possible, prefer github.com/qri-io/dataset/tabular // TODO(dustmop): Possibly move this to tabular package @@ -374,7 +417,7 @@ func TestCreateDatasetBodyTooLarge(t *testing.T) { } nextDs.SetBodyFile(qfs.NewMemfileBytes(testBodyPath, testBodyBytes)) - path, err := CreateDataset(ctx, fs, fs, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true}) if err != nil { t.Fatalf("CreateDataset: %s", err) } @@ -403,7 +446,7 @@ func TestWriteDataset(t *testing.T) { info := testPeers.GetTestPeerInfo(10) pk := info.PrivKey - if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" { + if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" { t.Errorf("didn't reject empty dataset: %s", err) } @@ -426,7 +469,7 @@ func TestWriteDataset(t *testing.T) { ds := tc.Input - got, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true}) + got, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true}) if !(err == nil && c.err == "" || err != nil && err.Error() == c.err) { t.Errorf("case %d error mismatch. expected: '%s', got: '%s'", i, c.err, err) continue @@ -1011,7 +1054,7 @@ func BenchmarkCreateDatasetCSV(b *testing.B) { _, dataset := GenerateDataset(b, sampleSize, "csv") b.StartTimer() - _, err := CreateDataset(ctx, fs, fs, dataset, nil, privKey, SaveSwitches{ShouldRender: true}) + _, err := CreateDataset(ctx, fs, fs, event.NilBus, dataset, nil, privKey, SaveSwitches{ShouldRender: true}) if err != nil { b.Errorf("error creating dataset: %s", err.Error()) } diff --git a/base/dsfs/testdata_test.go b/base/dsfs/testdata_test.go index 771d40ee3..af3ea706e 100644 --- a/base/dsfs/testdata_test.go +++ b/base/dsfs/testdata_test.go @@ -10,6 +10,7 @@ import ( "github.com/qri-io/dataset" "github.com/qri-io/qfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) var AirportCodes = &dataset.Dataset{ @@ -178,7 +179,7 @@ func makeFilestore() (map[string]string, qfs.Filesystem, error) { ds.SetBodyFile(qfs.NewMemfileBytes(fmt.Sprintf("/body.%s", ds.Structure.Format), data)) - dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true}) + dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true}) if err != nil { return datasets, nil, fmt.Errorf("dataset: %s write error: %s", k, err.Error()) } diff --git a/base/dsfs/transform_test.go b/base/dsfs/transform_test.go index e4587b96f..1bf1cb05b 100644 --- a/base/dsfs/transform_test.go +++ b/base/dsfs/transform_test.go @@ -10,6 +10,7 @@ import ( "github.com/qri-io/dataset/dstest" "github.com/qri-io/qfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestLoadTransform(t *testing.T) { @@ -45,7 +46,7 @@ func TestLoadTransformScript(t *testing.T) { t.Fatal(err.Error()) } - path, err := CreateDataset(ctx, fs, fs, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) if err != nil { t.Fatal(err.Error()) } @@ -64,7 +65,7 @@ func TestLoadTransformScript(t *testing.T) { t.Fatal(err.Error()) } tc.Input.Transform.ScriptPath = transformPath - path, err = CreateDataset(ctx, fs, fs, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) + path, err = CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) if err != nil { t.Fatal(err.Error()) } diff --git a/base/save.go b/base/save.go index a18c47e6e..bc4375793 100644 --- a/base/save.go +++ b/base/save.go @@ -146,7 +146,7 @@ func CreateDataset(ctx context.Context, r repo.Repo, writeDest qfs.Filesystem, d return } - if path, err = dsfs.CreateDataset(ctx, r.Filesystem(), writeDest, ds, dsPrev, r.PrivateKey(), sw); err != nil { + if path, err = dsfs.CreateDataset(ctx, r.Filesystem(), writeDest, r.Bus(), ds, dsPrev, r.PrivateKey(), sw); err != nil { log.Debugf("dsfs.CreateDataset: %s", err) return nil, err } diff --git a/base/save_test.go b/base/save_test.go index 816e663c5..9dcf7613c 100644 --- a/base/save_test.go +++ b/base/save_test.go @@ -1,6 +1,7 @@ package base import ( + "bytes" "context" "testing" @@ -97,14 +98,16 @@ func TestSaveDatasetReplace(t *testing.T) { } func TestCreateDataset(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fs, err := muxfs.New(ctx, []qfs.Config{ {Type: "mem"}, }) if err != nil { t.Fatal(err) } - r, err := repo.NewMemRepoWithProfile(ctx, testPeerProfile, fs, event.NilBus) + r, err := repo.NewMemRepoWithProfile(ctx, testPeerProfile, fs, event.NewBus(ctx)) if err != nil { t.Fatal(err.Error()) } @@ -120,6 +123,7 @@ func TestCreateDataset(t *testing.T) { }, } ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte("[]"))) + PrintProgressBarsOnSave(&bytes.Buffer{}, r.Bus()) if _, err := CreateDataset(ctx, r, r.Filesystem().DefaultWriteFS(), &dataset.Dataset{}, &dataset.Dataset{}, SaveSwitches{Pin: true, ShouldRender: true}); err == nil { t.Error("expected bad dataset to error") diff --git a/cmd/print.go b/cmd/print.go index dd332923e..5ce8c58c8 100644 --- a/cmd/print.go +++ b/cmd/print.go @@ -2,19 +2,25 @@ package cmd import ( "bytes" + "context" "errors" "fmt" "io" + "math" "os" "os/exec" "runtime" "strings" + "sync" "github.com/fatih/color" "github.com/olekukonko/tablewriter" "github.com/qri-io/deepdiff" qrierr "github.com/qri-io/qri/errors" + "github.com/qri-io/qri/event" "github.com/qri-io/qri/lib" + "github.com/vbauerster/mpb/v5" + "github.com/vbauerster/mpb/v5/decor" ) var noPrompt = false @@ -230,3 +236,112 @@ func renderTable(writer io.Writer, header []string, data [][]string) { table.AppendBulk(data) table.Render() } + +// PrintProgressBarsOnEvents writes save progress data to the given writer +func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) { + var lock sync.Mutex + // initialize progress container, with custom width + p := mpb.New(mpb.WithWidth(80)) + progress := map[string]*mpb.Bar{} + + // wire up a subscription to print download progress to streams + bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error { + lock.Lock() + defer lock.Unlock() + + switch evt := payload.(type) { + case event.DsSaveEvent: + evtID := fmt.Sprintf("%s/%s", evt.Username, evt.Name) + cpl := int64(math.Ceil(evt.Completion * 100)) + + switch typ { + case event.ETDatasetSaveStarted: + bar, exists := progress[evtID] + if !exists { + bar = addElapsedBar(p, 100, "saving") + progress[evtID] = bar + } + bar.SetCurrent(cpl) + case event.ETDatasetSaveProgress: + bar, exists := progress[evtID] + if !exists { + bar = addElapsedBar(p, 100, "saving") + progress[evtID] = bar + } + bar.SetCurrent(cpl) + case event.ETDatasetSaveCompleted: + if bar, exists := progress[evtID]; exists { + bar.SetTotal(100, true) + delete(progress, evtID) + } + } + case event.RemoteEvent: + switch typ { + case event.ETRemoteClientPushVersionProgress: + bar, exists := progress[evt.Ref.String()] + if !exists { + bar = addBar(p, int64(len(evt.Progress)), "pushing") + progress[evt.Ref.String()] = bar + } + bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) + case event.ETRemoteClientPushVersionCompleted: + if bar, exists := progress[evt.Ref.String()]; exists { + bar.SetCurrent(int64(len(evt.Progress))) + delete(progress, evt.Ref.String()) + } + + case event.ETRemoteClientPullVersionProgress: + bar, exists := progress[evt.Ref.String()] + if !exists { + bar = addBar(p, int64(len(evt.Progress)), "pulling") + progress[evt.Ref.String()] = bar + } + bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) + case event.ETRemoteClientPullVersionCompleted: + if bar, exists := progress[evt.Ref.String()]; exists { + bar.SetCurrent(int64(len(evt.Progress))) + delete(progress, evt.Ref.String()) + } + } + } + + if len(progress) == 0 { + p.Wait() + p = mpb.New(mpb.WithWidth(80)) + } + return nil + }, + event.ETDatasetSaveStarted, + event.ETDatasetSaveProgress, + event.ETDatasetSaveCompleted, + + event.ETRemoteClientPushVersionProgress, + event.ETRemoteClientPushVersionCompleted, + event.ETRemoteClientPullVersionProgress, + event.ETRemoteClientPullVersionCompleted, + ) +} + +func addBar(p *mpb.Progress, total int64, title string) *mpb.Bar { + return p.AddBar(100, + mpb.PrependDecorators( + // display our name with one space on the right + decor.Name(title, decor.WC{W: len(title) + 1, C: decor.DidentRight}), + // replace ETA decorator with "done" message, OnComplete event + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 4}), "done", + ), + )) +} + +func addElapsedBar(p *mpb.Progress, total int64, title string) *mpb.Bar { + return p.AddBar(100, + mpb.PrependDecorators( + // display our name with one space on the right + decor.Name(title, decor.WC{W: len(title) + 1, C: decor.DidentRight}), + // replace ETA decorator with "done" message, OnComplete event + decor.OnComplete( + decor.Elapsed(decor.ET_STYLE_GO, decor.WC{W: 4}), "done", + ), + )) +} diff --git a/cmd/qri.go b/cmd/qri.go index db99d8318..00ae2104a 100644 --- a/cmd/qri.go +++ b/cmd/qri.go @@ -161,6 +161,14 @@ func (o *QriOptions) Init() (err error) { } setNoColor(!shouldColorOutput) + // TODO (b5) - this is a hack to make progress bars not show up while running + // tests. It does have the real-world implication that "shouldColorOutput" + // being false also disables progress bars, which may be what we want (ahem: TTY + // detection), but even if so, isn't the right use of this variable name + if shouldColorOutput { + PrintProgressBarsOnEvents(o.IOStreams.ErrOut, o.inst.Bus()) + } + log.Debugf("running cmd %q", os.Args) return diff --git a/cmd/save.go b/cmd/save.go index 1bdc0627b..c35f15e65 100644 --- a/cmd/save.go +++ b/cmd/save.go @@ -145,9 +145,6 @@ func (o *SaveOptions) Validate() error { func (o *SaveOptions) Run() (err error) { printRefSelect(o.ErrOut, o.Refs) - o.StartSpinner() - defer o.StopSpinner() - p := &lib.SaveParams{ Ref: o.Refs.Ref(), BodyPath: o.BodyPath, @@ -169,17 +166,12 @@ func (o *SaveOptions) Run() (err error) { } if o.Secrets != nil { - // Stop the spinner so the user can see the prompt, and the answer they type will - // not be erased. Output the message to error stream in case stdout is captured. - o.StopSpinner() if !confirm(o.ErrOut, o.In, ` Warning: You are providing secrets to a dataset transformation. Never provide secrets to a transformation you do not trust. continue?`, true) { return } - // Restart the spinner. - o.StartSpinner() if p.Secrets, err = parseSecrets(o.Secrets...); err != nil { return err } @@ -190,7 +182,6 @@ continue?`, true) { return err } - o.StopSpinner() ref := dsref.ConvertDatasetToVersionInfo(res).SimpleRef() ref.ProfileID = "" printSuccess(o.ErrOut, "dataset saved: %s", ref.String()) diff --git a/event/dataset.go b/event/dataset.go index 3c91c6b0e..52b1b9d39 100644 --- a/event/dataset.go +++ b/event/dataset.go @@ -20,6 +20,19 @@ const ( // ETDatasetCreateLink is when a dataset is linked to a working directory // payload is a DsChange ETDatasetCreateLink = Type("dataset:CreateLink") + + // ETDatasetSaveStarted fires when saving a dataset starts + // subscriptions do not block the publisher + // payload will be a DsSaveEvent + ETDatasetSaveStarted = Type("dataset:SaveStarted") + // ETDatasetSaveProgress indicates a change in progress of dataset version + // creation. + // subscriptions do not block the publisher + // payload will be a DsSaveEvent + ETDatasetSaveProgress = Type("dataset:SaveProgress") + // ETDatasetSaveCompleted indicates creating a dataset version finished + // payload will be a DsSaveEvent + ETDatasetSaveCompleted = Type("dataset:SaveCompleted") ) // DsChange represents the result of a change to a dataset @@ -33,3 +46,18 @@ type DsChange struct { Info *dsref.VersionInfo `json:"info"` Dir string `json:"dir"` } + +// DsSaveEvent represents a change in version creation progress +type DsSaveEvent struct { + Username string `json:"username"` + Name string `json:"name"` + // either message or error will be populated. message should be human-centric + // description of progress + Message string `json:"message"` + // saving error. only populated on failed ETSaveDatasetCompleted event + Error error `json:"error,omitempty"` + // completion pct from 0-1 + Completion float64 `json:"complete"` + // only populated on successful ETDatasetSaveCompleted + Path string `json:"path,omitempty"` +} diff --git a/event/event.go b/event/event.go index b2c55f14a..23d872c59 100644 --- a/event/event.go +++ b/event/event.go @@ -40,16 +40,26 @@ type Publisher interface { Publish(ctx context.Context, t Type, payload interface{}) error } +// PublishLogError is a convenience function that fires an event to a publisher +// and logs any error using an external logger. Exists to make publishing in a +// goroutine a one-liner, eg: +// go event.PublishLogErr(ctx, pub, log, event.ETSaveStarted, payload) +func PublishLogError(ctx context.Context, pub Publisher, logger golog.StandardLogger, t Type, payload interface{}) { + if err := pub.Publish(ctx, t, payload); err != nil { + logger.Debug(err) + } +} + // Bus is a central coordination point for event publication and subscription -// zero or more subscribers register topics to be notified of, a publisher +// zero or more subscribers register eventTypes to be notified of, a publisher // writes a topic event to the bus, which broadcasts to all subscribers of that // topic type Bus interface { // Publish an event to the bus Publish(ctx context.Context, t Type, data interface{}) error - // Subscribe to one or more topics with a handler function that will be called + // Subscribe to one or more eventTypes with a handler function that will be called // whenever the event topic is published - Subscribe(handler Handler, topics ...Type) + Subscribe(handler Handler, eventTypes ...Type) // NumSubscriptions returns the number of subscribers to the bus's events NumSubscribers() int } @@ -68,7 +78,7 @@ func (nilBus) Publish(_ context.Context, _ Type, _ interface{}) error { return nil } -func (nilBus) Subscribe(handler Handler, topics ...Type) {} +func (nilBus) Subscribe(handler Handler, eventTypes ...Type) {} func (nilBus) NumSubscribers() int { return 0 @@ -108,6 +118,7 @@ func NewBus(ctx context.Context) Bus { func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error { b.lk.RLock() defer b.lk.RUnlock() + log.Debugw("publish", "topic", topic, "payload", data) if b.closed { return ErrBusClosed @@ -123,12 +134,12 @@ func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error { } // Subscribe requests events from the given topic, returning a channel of those events -func (b *bus) Subscribe(handler Handler, topics ...Type) { +func (b *bus) Subscribe(handler Handler, eventTypes ...Type) { b.lk.Lock() defer b.lk.Unlock() - log.Debugf("Subscribe: %v", topics) + log.Debugf("Subscribe: %v", eventTypes) - for _, topic := range topics { + for _, topic := range eventTypes { b.subs[topic] = append(b.subs[topic], handler) } } diff --git a/go.mod b/go.mod index 3644e4add..6e400d3ac 100644 --- a/go.mod +++ b/go.mod @@ -53,10 +53,12 @@ require ( github.com/spf13/cobra v1.0.0 github.com/theckman/go-flock v0.7.1 github.com/ugorji/go/codec v1.1.7 + github.com/vbauerster/mpb v3.4.0+incompatible + github.com/vbauerster/mpb/v5 v5.3.0 go.starlark.net v0.0.0-20200619143648-50ca820fafb9 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20200707034311-ab3426394381 - golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae + golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed golang.org/x/text v0.3.3 gopkg.in/yaml.v2 v2.3.0 nhooyr.io/websocket v1.8.6 diff --git a/go.sum b/go.sum index c5b0aa732..329e1b44f 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alangpierce/go-forceexport v0.0.0-20160317203124-8f1d6941cd75/go.mod h1:uAXEEpARkRhCZfEvy/y0Jcc888f9tHCc1W7/UeEtreE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -1242,6 +1244,11 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/vbauerster/mpb v1.1.3 h1:IRgic8VFaURXkW0VxDLkNOiNaAgtw0okB2YIaVvJDI4= +github.com/vbauerster/mpb v3.4.0+incompatible h1:mfiiYw87ARaeRW6x5gWwYRUawxaW1tLAD8IceomUCNw= +github.com/vbauerster/mpb v3.4.0+incompatible/go.mod h1:zAHG26FUhVKETRu+MWqYXcI70POlC6N8up9p1dID7SU= +github.com/vbauerster/mpb/v5 v5.3.0 h1:vgrEJjUzHaSZKDRRxul5Oh4C72Yy/5VEMb0em+9M0mQ= +github.com/vbauerster/mpb/v5 v5.3.0/go.mod h1:4yTkvAb8Cm4eylAp6t0JRq6pXDkFJ4krUlDqWYkakAs= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= @@ -1482,6 +1489,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed h1:WBkVNH1zd9jg/dK4HCM4lNANnmd12EHC9z+LmcCG4ns= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/lib/lib.go b/lib/lib.go index 60aea7513..beeb0c6f4 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -446,6 +446,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins if inst.repo == nil { if inst.repo, err = buildrepo.New(ctx, inst.repoPath, cfg, func(o *buildrepo.Options) { + o.Bus = inst.bus o.Filesystem = inst.qfs o.Profiles = inst.profiles o.Logbook = inst.logbook @@ -456,6 +457,10 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins } } + // Try to make the repo a hidden directory, but it's okay if we can't. Ignore the error. + _ = hiddenfile.SetFileHidden(inst.repoPath) + inst.fsi = fsi.NewFSI(inst.repo, inst.bus) + if o.statsCache != nil { inst.stats = stats.New(o.statsCache) } else if inst.stats == nil { @@ -464,12 +469,6 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins } } - if inst.repo != nil { - // Try to make the repo a hidden directory, but it's okay if we can't. Ignore the error. - _ = hiddenfile.SetFileHidden(inst.repoPath) - inst.fsi = fsi.NewFSI(inst.repo, inst.bus) - } - if inst.dscache == nil { inst.dscache, err = newDscache(ctx, inst.qfs, inst.bus, pro.Peername, inst.repoPath) if err != nil { @@ -492,7 +491,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins // Check if this is coming from a test, which is requesting a MockRemoteClient. key := InstanceContextKey("RemoteClient") if v := ctx.Value(key); v != nil && v == "mock" && inst.node != nil { - inst.node.LocalStreams = o.Streams + inst.node.LocalStreams = inst.streams if inst.remoteClient, err = remote.NewMockClient(ctx, inst.node, inst.logbook); err != nil { return } @@ -503,14 +502,13 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins }() } else if inst.node != nil { - inst.node.LocalStreams = o.Streams + inst.node.LocalStreams = inst.streams if _, e := inst.node.IPFSCoreAPI(); e == nil { if inst.remoteClient, err = remote.NewClient(ctx, inst.node, inst.bus); err != nil { log.Error("initializing remote client:", err.Error()) return } - remote.PrintProgressBarsOnPushPull(inst.streams.ErrOut, inst.bus) go func() { inst.releasers.Add(1) <-inst.remoteClient.Done() @@ -874,6 +872,14 @@ func (inst *Instance) RemoteClient() remote.Client { return inst.remoteClient } +// Bus exposes the instance event bus +func (inst *Instance) Bus() event.Bus { + if inst == nil { + return nil + } + return inst.bus +} + // checkRPCError validates RPC errors and in case of EOF returns a // more user friendly message func checkRPCError(err error) error { diff --git a/lib/load_test.go b/lib/load_test.go index ea0949069..3f003b6b2 100644 --- a/lib/load_test.go +++ b/lib/load_test.go @@ -7,6 +7,7 @@ import ( "github.com/qri-io/qri/base/dsfs" "github.com/qri-io/qri/dsref" dsrefspec "github.com/qri-io/qri/dsref/spec" + "github.com/qri-io/qri/event" ) func TestLoadDataset(t *testing.T) { @@ -24,6 +25,7 @@ func TestLoadDataset(t *testing.T) { tr.Ctx, fs, fs.DefaultWriteFS(), + event.NilBus, ds, nil, tr.Instance.repo.PrivateKey(), diff --git a/remote/client.go b/remote/client.go index 0fa5f1311..7a45c7bdb 100644 --- a/remote/client.go +++ b/remote/client.go @@ -4,15 +4,12 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "net/http" "net/url" "strings" - "sync" "time" - "github.com/cheggaaa/pb/v3" coreiface "github.com/ipfs/interface-go-ipfs-core" crypto "github.com/libp2p/go-libp2p-core/crypto" peer "github.com/libp2p/go-libp2p-core/peer" @@ -754,65 +751,3 @@ func (c *client) Done() <-chan struct{} { func (c *client) DoneErr() error { return c.doneErr } - -// PrintProgressBarsOnPushPull writes progress data to the given writer on -// push & pull. requires the event bus that a remote.Client is publishing on -func PrintProgressBarsOnPushPull(w io.Writer, bus event.Bus) { - var lock sync.Mutex - progress := map[string]*pb.ProgressBar{} - - // wire up a subscription to print download progress to streams - bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error { - lock.Lock() - defer lock.Unlock() - - switch typ { - case event.ETRemoteClientPushVersionProgress: - if evt, ok := payload.(event.RemoteEvent); ok { - bar, exists := progress[evt.Ref.String()] - if !exists { - bar = pb.New(len(evt.Progress)) - bar.SetWriter(w) - bar.SetMaxWidth(80) - bar.Start() - progress[evt.Ref.String()] = bar - } - bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) - } - case event.ETRemoteClientPushVersionCompleted: - if evt, ok := payload.(event.RemoteEvent); ok { - if bar, exists := progress[evt.Ref.String()]; exists { - bar.SetCurrent(bar.Total()) - bar.Finish() - delete(progress, evt.Ref.String()) - } - } - case event.ETRemoteClientPullVersionProgress: - if evt, ok := payload.(event.RemoteEvent); ok { - bar, exists := progress[evt.Ref.String()] - if !exists { - bar = pb.New(len(evt.Progress)) - bar.SetWriter(w) - bar.SetMaxWidth(80) - bar.Start() - progress[evt.Ref.String()] = bar - } - bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) - } - case event.ETRemoteClientPullVersionCompleted: - if evt, ok := payload.(event.RemoteEvent); ok { - if bar, exists := progress[evt.Ref.String()]; exists { - bar.SetCurrent(bar.Total()) - bar.Finish() - delete(progress, evt.Ref.String()) - } - } - } - return nil - }, - event.ETRemoteClientPushVersionProgress, - event.ETRemoteClientPushVersionCompleted, - event.ETRemoteClientPullVersionProgress, - event.ETRemoteClientPullVersionCompleted, - ) -} diff --git a/remote/mock_client.go b/remote/mock_client.go index c998a00b4..10ccf1de9 100644 --- a/remote/mock_client.go +++ b/remote/mock_client.go @@ -187,7 +187,7 @@ func (c *MockClient) createTheirDataset(ctx context.Context, ref *dsref.Ref) err // Store with dsfs sw := dsfs.SaveSwitches{} - path, err := dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), &ds, nil, other.info.PrivKey, sw) + path, err := dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), event.NilBus, &ds, nil, other.info.PrivKey, sw) if err != nil { return err } diff --git a/repo/test/test_repo.go b/repo/test/test_repo.go index a36bc390b..77641801d 100644 --- a/repo/test/test_repo.go +++ b/repo/test/test_repo.go @@ -218,7 +218,7 @@ func createDataset(r repo.Repo, tc dstest.TestCase) (ref reporef.DatasetRef, err sw := dsfs.SaveSwitches{Pin: true, ShouldRender: true} fs := r.Filesystem() - if path, err = dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), ds, nil, r.PrivateKey(), sw); err != nil { + if path, err = dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), r.Bus(), ds, nil, r.PrivateKey(), sw); err != nil { return } if ds.PreviousPath != "" && ds.PreviousPath != "/" {