diff --git a/base/archive/archive_test.go b/base/archive/archive_test.go index bd7c5177a..92ffff22a 100644 --- a/base/archive/archive_test.go +++ b/base/archive/archive_test.go @@ -12,6 +12,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/dsfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestGenerateFilename(t *testing.T) { @@ -132,7 +133,7 @@ func testFS() (qfs.Filesystem, map[string]string, error) { ds.SetBodyFile(dataf) fs := qfs.NewMemFS() - dskey, err := dsfs.WriteDataset(ctx, nil, fs, ds, pk, dsfs.SaveSwitches{}) + dskey, err := dsfs.WriteDataset(ctx, nil, fs, event.NilBus, ds, pk, dsfs.SaveSwitches{}) if err != nil { return fs, ns, err } @@ -179,7 +180,7 @@ func testFSWithVizAndTransform() (qfs.Filesystem, map[string]string, error) { privKey := testPeers.GetTestPeerInfo(10).PrivKey var dsLk sync.Mutex - dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, ds, privKey, dsfs.SaveSwitches{Pin: true}) + dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, event.NilBus, ds, privKey, dsfs.SaveSwitches{Pin: true}) if err != nil { return st, ns, err } diff --git a/base/dsfs/commit.go b/base/dsfs/commit.go index feebd2741..8e34f3250 100644 --- a/base/dsfs/commit.go +++ b/base/dsfs/commit.go @@ -16,6 +16,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/friendly" "github.com/qri-io/qri/base/toqtype" + "github.com/qri-io/qri/event" ) // Timestamp is an function for getting commit timestamps @@ -62,13 +63,19 @@ func loadCommit(ctx context.Context, fs qfs.Filesystem, path string) (st *datase return dataset.UnmarshalCommit(data) } -func commitFileAddFunc(privKey crypto.PrivKey) addWriteFileFunc { +func commitFileAddFunc(privKey crypto.PrivKey, pub event.Publisher) addWriteFileFunc { return func(ds *dataset.Dataset, wfs *writeFiles) error { if ds.Commit == nil { return nil } hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) { + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{ + Username: ds.Peername, + Name: ds.Name, + Message: "finalizing", + Completion: 0.9, + }) if cff, ok := wfs.body.(*computeFieldsFile); ok { updateScriptPaths(ds, added) diff --git a/base/dsfs/compute_fields.go b/base/dsfs/compute_fields.go index 46945ef42..cc3ef9cae 100644 --- a/base/dsfs/compute_fields.go +++ b/base/dsfs/compute_fields.go @@ -14,6 +14,7 @@ import ( "github.com/qri-io/dataset/dsstats" "github.com/qri-io/jsonschema" "github.com/qri-io/qfs" + "github.com/qri-io/qri/event" ) type computeFieldsFile struct { @@ -48,7 +49,15 @@ var ( _ statsComponentFile = (*computeFieldsFile)(nil) ) -func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesystem, pk crypto.PrivKey, ds, prev *dataset.Dataset, sw SaveSwitches) (qfs.File, error) { +func newComputeFieldsFile( + ctx context.Context, + dsLk *sync.Mutex, + fs qfs.Filesystem, + pub event.Publisher, + pk crypto.PrivKey, + ds *dataset.Dataset, + prev *dataset.Dataset, + sw SaveSwitches) (qfs.File, error) { var ( bf = ds.BodyFile() bfPrev qfs.File @@ -84,7 +93,7 @@ func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesyst done: make(chan error), } - go cff.handleRows(ctx) + go cff.handleRows(ctx, pub) return cff, nil } @@ -151,7 +160,7 @@ func (cff *computeFieldsFile) StatsComponent() (*dataset.Stats, error) { }, nil } -func (cff *computeFieldsFile) handleRows(ctx context.Context) { +func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publisher) { var ( batchBuf *dsio.EntryBuffer st = cff.ds.Structure @@ -201,6 +210,16 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context) { return } + // publish here so we know that if the user sees the "processing body file" + // message, we know that a compute-fields-file has made it all the way through + // setup + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{ + Username: cff.ds.Peername, + Name: cff.ds.Name, + Message: "processing body file", + Completion: 0.1, + }) + go func() { err = dsio.EachEntry(r, func(i int, ent dsio.Entry, err error) error { if err != nil { diff --git a/base/dsfs/dataset.go b/base/dsfs/dataset.go index 4b388932f..eaa914572 100644 --- a/base/dsfs/dataset.go +++ b/base/dsfs/dataset.go @@ -11,6 +11,7 @@ import ( "github.com/qri-io/dataset" "github.com/qri-io/dataset/validate" "github.com/qri-io/qfs" + "github.com/qri-io/qri/event" ) // number of entries to per batch when processing body data in WriteDataset @@ -142,6 +143,7 @@ func CreateDataset( ctx context.Context, source qfs.Filesystem, destination qfs.Filesystem, + pub event.Publisher, ds *dataset.Dataset, prev *dataset.Dataset, pk crypto.PrivKey, @@ -192,17 +194,34 @@ func CreateDataset( } // lock for editing dataset pointer - var dsLk = &sync.Mutex{} + var ( + dsLk = &sync.Mutex{} + peername = ds.Peername + name = ds.Name + ) - bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pk, ds, prev, sw) + bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pub, pk, ds, prev, sw) if err != nil { return "", err } ds.SetBodyFile(bodyFile) - path, err := WriteDataset(ctx, dsLk, destination, ds, pk, sw) + go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveStarted, event.DsSaveEvent{ + Username: peername, + Name: name, + Message: "save started", + Completion: 0, + }) + + path, err := WriteDataset(ctx, dsLk, destination, pub, ds, pk, sw) if err != nil { log.Debug(err.Error()) + event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Error: err, + Completion: 1.0, + }) return "", err } @@ -211,9 +230,22 @@ func CreateDataset( // the caller doesn't use the ds arg afterward // might make sense to have a wrapper function that writes and loads on success if err := DerefDataset(ctx, destination, ds); err != nil { + event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Error: err, + Completion: 1.0, + }) return path, err } - return path, nil + + return path, pub.Publish(ctx, event.ETDatasetSaveCompleted, event.DsSaveEvent{ + Username: peername, + Name: name, + Message: "dataset saved", + Path: path, + Completion: 1.0, + }) } // WriteDataset persists a datasets to a destination filesystem @@ -221,6 +253,7 @@ func WriteDataset( ctx context.Context, dsLk *sync.Mutex, destination qfs.Filesystem, + pub event.Publisher, ds *dataset.Dataset, pk crypto.PrivKey, sw SaveSwitches, @@ -240,7 +273,7 @@ func WriteDataset( addStatsFile, addReadmeFile, vizFilesAddFunc(destination, sw), - commitFileAddFunc(pk), + commitFileAddFunc(pk, pub), addDatasetFile, } diff --git a/base/dsfs/dataset_test.go b/base/dsfs/dataset_test.go index 19c073a2f..d966273f7 100644 --- a/base/dsfs/dataset_test.go +++ b/base/dsfs/dataset_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/qri-io/dataset" "github.com/qri-io/dataset/dsio" "github.com/qri-io/dataset/dstest" @@ -20,6 +21,7 @@ import ( "github.com/qri-io/qfs" "github.com/qri-io/qri/base/toqtype" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestLoadDataset(t *testing.T) { @@ -47,7 +49,7 @@ func TestLoadDataset(t *testing.T) { info := testPeers.GetTestPeerInfo(10) pk := info.PrivKey - apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{}) + apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{}) if err != nil { t.Errorf(err.Error()) return @@ -156,7 +158,7 @@ func TestCreateDataset(t *testing.T) { t.Fatalf("creating test case: %s", err) } - _, err = CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) + _, err = CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) if err == nil { t.Fatalf("CreateDataset expected error. got nil") } @@ -189,7 +191,7 @@ func TestCreateDataset(t *testing.T) { t.Fatalf("creating test case: %s", err) } - path, err := CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true}) if err != nil { t.Fatalf("CreateDataset: %s", err) } @@ -218,7 +220,7 @@ func TestCreateDataset(t *testing.T) { } t.Run("no_priv_key", func(t *testing.T) { - _, err := CreateDataset(ctx, fs, fs, nil, nil, nil, SaveSwitches{ShouldRender: true}) + _, err := CreateDataset(ctx, fs, fs, event.NilBus, nil, nil, nil, SaveSwitches{ShouldRender: true}) if err == nil { t.Fatal("expected call without prvate key to error") } @@ -242,7 +244,7 @@ func TestCreateDataset(t *testing.T) { t.Errorf("case nil body and previous body files, error reading data file: %s", err.Error()) } expectedErr := "bodyfile or previous bodyfile needed" - _, err = CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{ShouldRender: true}) + _, err = CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{ShouldRender: true}) if err.Error() != expectedErr { t.Errorf("case nil body and previous body files, error mismatch: expected '%s', got '%s'", expectedErr, err.Error()) } @@ -272,7 +274,7 @@ func TestCreateDataset(t *testing.T) { } ds.SetBodyFile(qfs.NewMemfileBytes("body.csv", bodyBytes)) - path, err := CreateDataset(ctx, fs, fs, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true}) if err != nil && err.Error() != expectedErr { t.Fatalf("mismatch: expected %q, got %q", expectedErr, err.Error()) } else if err == nil { @@ -312,7 +314,7 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) { } ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`))) - path, err := CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{}) if err != nil { t.Fatal(err) } @@ -324,6 +326,47 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) { } } +func TestDatasetSaveEvents(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fs := qfs.NewMemFS() + privKey := testPeers.GetTestPeerInfo(10).PrivKey + bus := event.NewBus(ctx) + + fired := map[event.Type]int{} + bus.Subscribe(func(ctx context.Context, t event.Type, payload interface{}) error { + fired[t]++ + return nil + }, + event.ETDatasetSaveStarted, + event.ETDatasetSaveProgress, + event.ETDatasetSaveCompleted, + ) + + ds := &dataset.Dataset{ + Commit: &dataset.Commit{ + Timestamp: time.Date(2100, 1, 2, 3, 4, 5, 6, time.Local), + }, + Structure: &dataset.Structure{Format: "json", Schema: dataset.BaseSchemaArray}, + } + ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`))) + + if _, err := CreateDataset(ctx, fs, fs, bus, ds, nil, privKey, SaveSwitches{}); err != nil { + t.Fatal(err) + } + + expect := map[event.Type]int{ + event.ETDatasetSaveStarted: 1, + event.ETDatasetSaveProgress: 2, + event.ETDatasetSaveCompleted: 1, + } + + if diff := cmp.Diff(expect, fired); diff != "" { + t.Errorf("fired event count mismatch. (-want +got):%s\n", diff) + } +} + // BaseTabularSchema is the base schema for tabular data // NOTE: Do not use if possible, prefer github.com/qri-io/dataset/tabular // TODO(dustmop): Possibly move this to tabular package @@ -374,7 +417,7 @@ func TestCreateDatasetBodyTooLarge(t *testing.T) { } nextDs.SetBodyFile(qfs.NewMemfileBytes(testBodyPath, testBodyBytes)) - path, err := CreateDataset(ctx, fs, fs, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true}) if err != nil { t.Fatalf("CreateDataset: %s", err) } @@ -403,7 +446,7 @@ func TestWriteDataset(t *testing.T) { info := testPeers.GetTestPeerInfo(10) pk := info.PrivKey - if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" { + if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" { t.Errorf("didn't reject empty dataset: %s", err) } @@ -426,7 +469,7 @@ func TestWriteDataset(t *testing.T) { ds := tc.Input - got, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true}) + got, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true}) if !(err == nil && c.err == "" || err != nil && err.Error() == c.err) { t.Errorf("case %d error mismatch. expected: '%s', got: '%s'", i, c.err, err) continue @@ -1011,7 +1054,7 @@ func BenchmarkCreateDatasetCSV(b *testing.B) { _, dataset := GenerateDataset(b, sampleSize, "csv") b.StartTimer() - _, err := CreateDataset(ctx, fs, fs, dataset, nil, privKey, SaveSwitches{ShouldRender: true}) + _, err := CreateDataset(ctx, fs, fs, event.NilBus, dataset, nil, privKey, SaveSwitches{ShouldRender: true}) if err != nil { b.Errorf("error creating dataset: %s", err.Error()) } diff --git a/base/dsfs/testdata_test.go b/base/dsfs/testdata_test.go index 771d40ee3..af3ea706e 100644 --- a/base/dsfs/testdata_test.go +++ b/base/dsfs/testdata_test.go @@ -10,6 +10,7 @@ import ( "github.com/qri-io/dataset" "github.com/qri-io/qfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) var AirportCodes = &dataset.Dataset{ @@ -178,7 +179,7 @@ func makeFilestore() (map[string]string, qfs.Filesystem, error) { ds.SetBodyFile(qfs.NewMemfileBytes(fmt.Sprintf("/body.%s", ds.Structure.Format), data)) - dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true}) + dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true}) if err != nil { return datasets, nil, fmt.Errorf("dataset: %s write error: %s", k, err.Error()) } diff --git a/base/dsfs/transform_test.go b/base/dsfs/transform_test.go index e4587b96f..1bf1cb05b 100644 --- a/base/dsfs/transform_test.go +++ b/base/dsfs/transform_test.go @@ -10,6 +10,7 @@ import ( "github.com/qri-io/dataset/dstest" "github.com/qri-io/qfs" testPeers "github.com/qri-io/qri/config/test" + "github.com/qri-io/qri/event" ) func TestLoadTransform(t *testing.T) { @@ -45,7 +46,7 @@ func TestLoadTransformScript(t *testing.T) { t.Fatal(err.Error()) } - path, err := CreateDataset(ctx, fs, fs, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) + path, err := CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) if err != nil { t.Fatal(err.Error()) } @@ -64,7 +65,7 @@ func TestLoadTransformScript(t *testing.T) { t.Fatal(err.Error()) } tc.Input.Transform.ScriptPath = transformPath - path, err = CreateDataset(ctx, fs, fs, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) + path, err = CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, nil, privKey, SaveSwitches{Pin: true, ShouldRender: true}) if err != nil { t.Fatal(err.Error()) } diff --git a/base/save.go b/base/save.go index a18c47e6e..bc4375793 100644 --- a/base/save.go +++ b/base/save.go @@ -146,7 +146,7 @@ func CreateDataset(ctx context.Context, r repo.Repo, writeDest qfs.Filesystem, d return } - if path, err = dsfs.CreateDataset(ctx, r.Filesystem(), writeDest, ds, dsPrev, r.PrivateKey(), sw); err != nil { + if path, err = dsfs.CreateDataset(ctx, r.Filesystem(), writeDest, r.Bus(), ds, dsPrev, r.PrivateKey(), sw); err != nil { log.Debugf("dsfs.CreateDataset: %s", err) return nil, err } diff --git a/base/save_test.go b/base/save_test.go index 816e663c5..9dcf7613c 100644 --- a/base/save_test.go +++ b/base/save_test.go @@ -1,6 +1,7 @@ package base import ( + "bytes" "context" "testing" @@ -97,14 +98,16 @@ func TestSaveDatasetReplace(t *testing.T) { } func TestCreateDataset(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fs, err := muxfs.New(ctx, []qfs.Config{ {Type: "mem"}, }) if err != nil { t.Fatal(err) } - r, err := repo.NewMemRepoWithProfile(ctx, testPeerProfile, fs, event.NilBus) + r, err := repo.NewMemRepoWithProfile(ctx, testPeerProfile, fs, event.NewBus(ctx)) if err != nil { t.Fatal(err.Error()) } @@ -120,6 +123,7 @@ func TestCreateDataset(t *testing.T) { }, } ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte("[]"))) + PrintProgressBarsOnSave(&bytes.Buffer{}, r.Bus()) if _, err := CreateDataset(ctx, r, r.Filesystem().DefaultWriteFS(), &dataset.Dataset{}, &dataset.Dataset{}, SaveSwitches{Pin: true, ShouldRender: true}); err == nil { t.Error("expected bad dataset to error") diff --git a/cmd/print.go b/cmd/print.go index dd332923e..5ce8c58c8 100644 --- a/cmd/print.go +++ b/cmd/print.go @@ -2,19 +2,25 @@ package cmd import ( "bytes" + "context" "errors" "fmt" "io" + "math" "os" "os/exec" "runtime" "strings" + "sync" "github.com/fatih/color" "github.com/olekukonko/tablewriter" "github.com/qri-io/deepdiff" qrierr "github.com/qri-io/qri/errors" + "github.com/qri-io/qri/event" "github.com/qri-io/qri/lib" + "github.com/vbauerster/mpb/v5" + "github.com/vbauerster/mpb/v5/decor" ) var noPrompt = false @@ -230,3 +236,112 @@ func renderTable(writer io.Writer, header []string, data [][]string) { table.AppendBulk(data) table.Render() } + +// PrintProgressBarsOnEvents writes save progress data to the given writer +func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) { + var lock sync.Mutex + // initialize progress container, with custom width + p := mpb.New(mpb.WithWidth(80)) + progress := map[string]*mpb.Bar{} + + // wire up a subscription to print download progress to streams + bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error { + lock.Lock() + defer lock.Unlock() + + switch evt := payload.(type) { + case event.DsSaveEvent: + evtID := fmt.Sprintf("%s/%s", evt.Username, evt.Name) + cpl := int64(math.Ceil(evt.Completion * 100)) + + switch typ { + case event.ETDatasetSaveStarted: + bar, exists := progress[evtID] + if !exists { + bar = addElapsedBar(p, 100, "saving") + progress[evtID] = bar + } + bar.SetCurrent(cpl) + case event.ETDatasetSaveProgress: + bar, exists := progress[evtID] + if !exists { + bar = addElapsedBar(p, 100, "saving") + progress[evtID] = bar + } + bar.SetCurrent(cpl) + case event.ETDatasetSaveCompleted: + if bar, exists := progress[evtID]; exists { + bar.SetTotal(100, true) + delete(progress, evtID) + } + } + case event.RemoteEvent: + switch typ { + case event.ETRemoteClientPushVersionProgress: + bar, exists := progress[evt.Ref.String()] + if !exists { + bar = addBar(p, int64(len(evt.Progress)), "pushing") + progress[evt.Ref.String()] = bar + } + bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) + case event.ETRemoteClientPushVersionCompleted: + if bar, exists := progress[evt.Ref.String()]; exists { + bar.SetCurrent(int64(len(evt.Progress))) + delete(progress, evt.Ref.String()) + } + + case event.ETRemoteClientPullVersionProgress: + bar, exists := progress[evt.Ref.String()] + if !exists { + bar = addBar(p, int64(len(evt.Progress)), "pulling") + progress[evt.Ref.String()] = bar + } + bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) + case event.ETRemoteClientPullVersionCompleted: + if bar, exists := progress[evt.Ref.String()]; exists { + bar.SetCurrent(int64(len(evt.Progress))) + delete(progress, evt.Ref.String()) + } + } + } + + if len(progress) == 0 { + p.Wait() + p = mpb.New(mpb.WithWidth(80)) + } + return nil + }, + event.ETDatasetSaveStarted, + event.ETDatasetSaveProgress, + event.ETDatasetSaveCompleted, + + event.ETRemoteClientPushVersionProgress, + event.ETRemoteClientPushVersionCompleted, + event.ETRemoteClientPullVersionProgress, + event.ETRemoteClientPullVersionCompleted, + ) +} + +func addBar(p *mpb.Progress, total int64, title string) *mpb.Bar { + return p.AddBar(100, + mpb.PrependDecorators( + // display our name with one space on the right + decor.Name(title, decor.WC{W: len(title) + 1, C: decor.DidentRight}), + // replace ETA decorator with "done" message, OnComplete event + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 4}), "done", + ), + )) +} + +func addElapsedBar(p *mpb.Progress, total int64, title string) *mpb.Bar { + return p.AddBar(100, + mpb.PrependDecorators( + // display our name with one space on the right + decor.Name(title, decor.WC{W: len(title) + 1, C: decor.DidentRight}), + // replace ETA decorator with "done" message, OnComplete event + decor.OnComplete( + decor.Elapsed(decor.ET_STYLE_GO, decor.WC{W: 4}), "done", + ), + )) +} diff --git a/cmd/qri.go b/cmd/qri.go index db99d8318..00ae2104a 100644 --- a/cmd/qri.go +++ b/cmd/qri.go @@ -161,6 +161,14 @@ func (o *QriOptions) Init() (err error) { } setNoColor(!shouldColorOutput) + // TODO (b5) - this is a hack to make progress bars not show up while running + // tests. It does have the real-world implication that "shouldColorOutput" + // being false also disables progress bars, which may be what we want (ahem: TTY + // detection), but even if so, isn't the right use of this variable name + if shouldColorOutput { + PrintProgressBarsOnEvents(o.IOStreams.ErrOut, o.inst.Bus()) + } + log.Debugf("running cmd %q", os.Args) return diff --git a/cmd/save.go b/cmd/save.go index 1bdc0627b..c35f15e65 100644 --- a/cmd/save.go +++ b/cmd/save.go @@ -145,9 +145,6 @@ func (o *SaveOptions) Validate() error { func (o *SaveOptions) Run() (err error) { printRefSelect(o.ErrOut, o.Refs) - o.StartSpinner() - defer o.StopSpinner() - p := &lib.SaveParams{ Ref: o.Refs.Ref(), BodyPath: o.BodyPath, @@ -169,17 +166,12 @@ func (o *SaveOptions) Run() (err error) { } if o.Secrets != nil { - // Stop the spinner so the user can see the prompt, and the answer they type will - // not be erased. Output the message to error stream in case stdout is captured. - o.StopSpinner() if !confirm(o.ErrOut, o.In, ` Warning: You are providing secrets to a dataset transformation. Never provide secrets to a transformation you do not trust. continue?`, true) { return } - // Restart the spinner. - o.StartSpinner() if p.Secrets, err = parseSecrets(o.Secrets...); err != nil { return err } @@ -190,7 +182,6 @@ continue?`, true) { return err } - o.StopSpinner() ref := dsref.ConvertDatasetToVersionInfo(res).SimpleRef() ref.ProfileID = "" printSuccess(o.ErrOut, "dataset saved: %s", ref.String()) diff --git a/event/dataset.go b/event/dataset.go index 3c91c6b0e..52b1b9d39 100644 --- a/event/dataset.go +++ b/event/dataset.go @@ -20,6 +20,19 @@ const ( // ETDatasetCreateLink is when a dataset is linked to a working directory // payload is a DsChange ETDatasetCreateLink = Type("dataset:CreateLink") + + // ETDatasetSaveStarted fires when saving a dataset starts + // subscriptions do not block the publisher + // payload will be a DsSaveEvent + ETDatasetSaveStarted = Type("dataset:SaveStarted") + // ETDatasetSaveProgress indicates a change in progress of dataset version + // creation. + // subscriptions do not block the publisher + // payload will be a DsSaveEvent + ETDatasetSaveProgress = Type("dataset:SaveProgress") + // ETDatasetSaveCompleted indicates creating a dataset version finished + // payload will be a DsSaveEvent + ETDatasetSaveCompleted = Type("dataset:SaveCompleted") ) // DsChange represents the result of a change to a dataset @@ -33,3 +46,18 @@ type DsChange struct { Info *dsref.VersionInfo `json:"info"` Dir string `json:"dir"` } + +// DsSaveEvent represents a change in version creation progress +type DsSaveEvent struct { + Username string `json:"username"` + Name string `json:"name"` + // either message or error will be populated. message should be human-centric + // description of progress + Message string `json:"message"` + // saving error. only populated on failed ETSaveDatasetCompleted event + Error error `json:"error,omitempty"` + // completion pct from 0-1 + Completion float64 `json:"complete"` + // only populated on successful ETDatasetSaveCompleted + Path string `json:"path,omitempty"` +} diff --git a/event/event.go b/event/event.go index b2c55f14a..23d872c59 100644 --- a/event/event.go +++ b/event/event.go @@ -40,16 +40,26 @@ type Publisher interface { Publish(ctx context.Context, t Type, payload interface{}) error } +// PublishLogError is a convenience function that fires an event to a publisher +// and logs any error using an external logger. Exists to make publishing in a +// goroutine a one-liner, eg: +// go event.PublishLogErr(ctx, pub, log, event.ETSaveStarted, payload) +func PublishLogError(ctx context.Context, pub Publisher, logger golog.StandardLogger, t Type, payload interface{}) { + if err := pub.Publish(ctx, t, payload); err != nil { + logger.Debug(err) + } +} + // Bus is a central coordination point for event publication and subscription -// zero or more subscribers register topics to be notified of, a publisher +// zero or more subscribers register eventTypes to be notified of, a publisher // writes a topic event to the bus, which broadcasts to all subscribers of that // topic type Bus interface { // Publish an event to the bus Publish(ctx context.Context, t Type, data interface{}) error - // Subscribe to one or more topics with a handler function that will be called + // Subscribe to one or more eventTypes with a handler function that will be called // whenever the event topic is published - Subscribe(handler Handler, topics ...Type) + Subscribe(handler Handler, eventTypes ...Type) // NumSubscriptions returns the number of subscribers to the bus's events NumSubscribers() int } @@ -68,7 +78,7 @@ func (nilBus) Publish(_ context.Context, _ Type, _ interface{}) error { return nil } -func (nilBus) Subscribe(handler Handler, topics ...Type) {} +func (nilBus) Subscribe(handler Handler, eventTypes ...Type) {} func (nilBus) NumSubscribers() int { return 0 @@ -108,6 +118,7 @@ func NewBus(ctx context.Context) Bus { func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error { b.lk.RLock() defer b.lk.RUnlock() + log.Debugw("publish", "topic", topic, "payload", data) if b.closed { return ErrBusClosed @@ -123,12 +134,12 @@ func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error { } // Subscribe requests events from the given topic, returning a channel of those events -func (b *bus) Subscribe(handler Handler, topics ...Type) { +func (b *bus) Subscribe(handler Handler, eventTypes ...Type) { b.lk.Lock() defer b.lk.Unlock() - log.Debugf("Subscribe: %v", topics) + log.Debugf("Subscribe: %v", eventTypes) - for _, topic := range topics { + for _, topic := range eventTypes { b.subs[topic] = append(b.subs[topic], handler) } } diff --git a/go.mod b/go.mod index 3644e4add..6e400d3ac 100644 --- a/go.mod +++ b/go.mod @@ -53,10 +53,12 @@ require ( github.com/spf13/cobra v1.0.0 github.com/theckman/go-flock v0.7.1 github.com/ugorji/go/codec v1.1.7 + github.com/vbauerster/mpb v3.4.0+incompatible + github.com/vbauerster/mpb/v5 v5.3.0 go.starlark.net v0.0.0-20200619143648-50ca820fafb9 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20200707034311-ab3426394381 - golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae + golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed golang.org/x/text v0.3.3 gopkg.in/yaml.v2 v2.3.0 nhooyr.io/websocket v1.8.6 diff --git a/go.sum b/go.sum index c5b0aa732..329e1b44f 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alangpierce/go-forceexport v0.0.0-20160317203124-8f1d6941cd75/go.mod h1:uAXEEpARkRhCZfEvy/y0Jcc888f9tHCc1W7/UeEtreE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -1242,6 +1244,11 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/vbauerster/mpb v1.1.3 h1:IRgic8VFaURXkW0VxDLkNOiNaAgtw0okB2YIaVvJDI4= +github.com/vbauerster/mpb v3.4.0+incompatible h1:mfiiYw87ARaeRW6x5gWwYRUawxaW1tLAD8IceomUCNw= +github.com/vbauerster/mpb v3.4.0+incompatible/go.mod h1:zAHG26FUhVKETRu+MWqYXcI70POlC6N8up9p1dID7SU= +github.com/vbauerster/mpb/v5 v5.3.0 h1:vgrEJjUzHaSZKDRRxul5Oh4C72Yy/5VEMb0em+9M0mQ= +github.com/vbauerster/mpb/v5 v5.3.0/go.mod h1:4yTkvAb8Cm4eylAp6t0JRq6pXDkFJ4krUlDqWYkakAs= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= @@ -1482,6 +1489,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed h1:WBkVNH1zd9jg/dK4HCM4lNANnmd12EHC9z+LmcCG4ns= +golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/lib/lib.go b/lib/lib.go index 60aea7513..beeb0c6f4 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -446,6 +446,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins if inst.repo == nil { if inst.repo, err = buildrepo.New(ctx, inst.repoPath, cfg, func(o *buildrepo.Options) { + o.Bus = inst.bus o.Filesystem = inst.qfs o.Profiles = inst.profiles o.Logbook = inst.logbook @@ -456,6 +457,10 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins } } + // Try to make the repo a hidden directory, but it's okay if we can't. Ignore the error. + _ = hiddenfile.SetFileHidden(inst.repoPath) + inst.fsi = fsi.NewFSI(inst.repo, inst.bus) + if o.statsCache != nil { inst.stats = stats.New(o.statsCache) } else if inst.stats == nil { @@ -464,12 +469,6 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins } } - if inst.repo != nil { - // Try to make the repo a hidden directory, but it's okay if we can't. Ignore the error. - _ = hiddenfile.SetFileHidden(inst.repoPath) - inst.fsi = fsi.NewFSI(inst.repo, inst.bus) - } - if inst.dscache == nil { inst.dscache, err = newDscache(ctx, inst.qfs, inst.bus, pro.Peername, inst.repoPath) if err != nil { @@ -492,7 +491,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins // Check if this is coming from a test, which is requesting a MockRemoteClient. key := InstanceContextKey("RemoteClient") if v := ctx.Value(key); v != nil && v == "mock" && inst.node != nil { - inst.node.LocalStreams = o.Streams + inst.node.LocalStreams = inst.streams if inst.remoteClient, err = remote.NewMockClient(ctx, inst.node, inst.logbook); err != nil { return } @@ -503,14 +502,13 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins }() } else if inst.node != nil { - inst.node.LocalStreams = o.Streams + inst.node.LocalStreams = inst.streams if _, e := inst.node.IPFSCoreAPI(); e == nil { if inst.remoteClient, err = remote.NewClient(ctx, inst.node, inst.bus); err != nil { log.Error("initializing remote client:", err.Error()) return } - remote.PrintProgressBarsOnPushPull(inst.streams.ErrOut, inst.bus) go func() { inst.releasers.Add(1) <-inst.remoteClient.Done() @@ -874,6 +872,14 @@ func (inst *Instance) RemoteClient() remote.Client { return inst.remoteClient } +// Bus exposes the instance event bus +func (inst *Instance) Bus() event.Bus { + if inst == nil { + return nil + } + return inst.bus +} + // checkRPCError validates RPC errors and in case of EOF returns a // more user friendly message func checkRPCError(err error) error { diff --git a/lib/load_test.go b/lib/load_test.go index ea0949069..3f003b6b2 100644 --- a/lib/load_test.go +++ b/lib/load_test.go @@ -7,6 +7,7 @@ import ( "github.com/qri-io/qri/base/dsfs" "github.com/qri-io/qri/dsref" dsrefspec "github.com/qri-io/qri/dsref/spec" + "github.com/qri-io/qri/event" ) func TestLoadDataset(t *testing.T) { @@ -24,6 +25,7 @@ func TestLoadDataset(t *testing.T) { tr.Ctx, fs, fs.DefaultWriteFS(), + event.NilBus, ds, nil, tr.Instance.repo.PrivateKey(), diff --git a/remote/client.go b/remote/client.go index 0fa5f1311..7a45c7bdb 100644 --- a/remote/client.go +++ b/remote/client.go @@ -4,15 +4,12 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "net/http" "net/url" "strings" - "sync" "time" - "github.com/cheggaaa/pb/v3" coreiface "github.com/ipfs/interface-go-ipfs-core" crypto "github.com/libp2p/go-libp2p-core/crypto" peer "github.com/libp2p/go-libp2p-core/peer" @@ -754,65 +751,3 @@ func (c *client) Done() <-chan struct{} { func (c *client) DoneErr() error { return c.doneErr } - -// PrintProgressBarsOnPushPull writes progress data to the given writer on -// push & pull. requires the event bus that a remote.Client is publishing on -func PrintProgressBarsOnPushPull(w io.Writer, bus event.Bus) { - var lock sync.Mutex - progress := map[string]*pb.ProgressBar{} - - // wire up a subscription to print download progress to streams - bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error { - lock.Lock() - defer lock.Unlock() - - switch typ { - case event.ETRemoteClientPushVersionProgress: - if evt, ok := payload.(event.RemoteEvent); ok { - bar, exists := progress[evt.Ref.String()] - if !exists { - bar = pb.New(len(evt.Progress)) - bar.SetWriter(w) - bar.SetMaxWidth(80) - bar.Start() - progress[evt.Ref.String()] = bar - } - bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) - } - case event.ETRemoteClientPushVersionCompleted: - if evt, ok := payload.(event.RemoteEvent); ok { - if bar, exists := progress[evt.Ref.String()]; exists { - bar.SetCurrent(bar.Total()) - bar.Finish() - delete(progress, evt.Ref.String()) - } - } - case event.ETRemoteClientPullVersionProgress: - if evt, ok := payload.(event.RemoteEvent); ok { - bar, exists := progress[evt.Ref.String()] - if !exists { - bar = pb.New(len(evt.Progress)) - bar.SetWriter(w) - bar.SetMaxWidth(80) - bar.Start() - progress[evt.Ref.String()] = bar - } - bar.SetCurrent(int64(evt.Progress.CompletedBlocks())) - } - case event.ETRemoteClientPullVersionCompleted: - if evt, ok := payload.(event.RemoteEvent); ok { - if bar, exists := progress[evt.Ref.String()]; exists { - bar.SetCurrent(bar.Total()) - bar.Finish() - delete(progress, evt.Ref.String()) - } - } - } - return nil - }, - event.ETRemoteClientPushVersionProgress, - event.ETRemoteClientPushVersionCompleted, - event.ETRemoteClientPullVersionProgress, - event.ETRemoteClientPullVersionCompleted, - ) -} diff --git a/remote/mock_client.go b/remote/mock_client.go index c998a00b4..10ccf1de9 100644 --- a/remote/mock_client.go +++ b/remote/mock_client.go @@ -187,7 +187,7 @@ func (c *MockClient) createTheirDataset(ctx context.Context, ref *dsref.Ref) err // Store with dsfs sw := dsfs.SaveSwitches{} - path, err := dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), &ds, nil, other.info.PrivKey, sw) + path, err := dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), event.NilBus, &ds, nil, other.info.PrivKey, sw) if err != nil { return err } diff --git a/repo/test/test_repo.go b/repo/test/test_repo.go index a36bc390b..77641801d 100644 --- a/repo/test/test_repo.go +++ b/repo/test/test_repo.go @@ -218,7 +218,7 @@ func createDataset(r repo.Repo, tc dstest.TestCase) (ref reporef.DatasetRef, err sw := dsfs.SaveSwitches{Pin: true, ShouldRender: true} fs := r.Filesystem() - if path, err = dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), ds, nil, r.PrivateKey(), sw); err != nil { + if path, err = dsfs.CreateDataset(ctx, fs, fs.DefaultWriteFS(), r.Bus(), ds, nil, r.PrivateKey(), sw); err != nil { return } if ds.PreviousPath != "" && ds.PreviousPath != "/" {