From 7d0cb91acaae0c6da76e39414abffdd4d4e45d5c Mon Sep 17 00:00:00 2001 From: b5 Date: Thu, 11 Feb 2021 11:40:33 -0500 Subject: [PATCH] feat(logbook): add methods for writing transform run ops Introduce a new type of branch-level operation to logbook that records details about a transform run. This commit adds two ways a Run-model operation can get into logbook. The first is via a new *run.State arg to `WriteVersionSave`: WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset, rs *run.State) error Passing a non-nil run.State writes two operations that are connected by referencing the same RunID. When read back out into a dataset log these operations are combined into a single DatasetLogItem, which has new fields for describing runs. The second way to add a Run operation to a branch log is via `WriteTransformRun` WriteTransformRun(ctx context.Context, initID string, rs *run.State) error This is intended for where we'd like to record a run that didn't create a commit In this case we record a run op, but no commit op. The run op will contain details about the failure. All of this needs tests. --- event/event.go | 4 - logbook/logbook.go | 137 ++++++++++++++++++++++++++++++-- logbook/logbook_test.go | 28 +++---- logbook/logsync/logsync_test.go | 10 +-- logbook/temp_builder.go | 2 +- logbook/types.go | 2 +- remote/mock_client.go | 2 +- repo/test/test_repo.go | 2 +- 8 files changed, 152 insertions(+), 35 deletions(-) diff --git a/event/event.go b/event/event.go index c8c454b16..ae60270c9 100644 --- a/event/event.go +++ b/event/event.go @@ -164,10 +164,6 @@ func (b *bus) publish(ctx context.Context, typ Type, sessionID string, payload i Payload: payload, } - if b.closed { - return e, ErrBusClosed - } - // TODO(dustmop): Add instrumentation, perhaps to ctx, to make logging / tracing // a single event easier to do. diff --git a/logbook/logbook.go b/logbook/logbook.go index 6c8ec5445..58876955b 100644 --- a/logbook/logbook.go +++ b/logbook/logbook.go @@ -23,6 +23,7 @@ import ( "github.com/qri-io/qri/event" "github.com/qri-io/qri/logbook/oplog" "github.com/qri-io/qri/profile" + "github.com/qri-io/qri/transform/run" ) var ( @@ -56,14 +57,21 @@ const ( CommitModel // PushModel is the enum for a push model PushModel + // RunModel is the enum for transform execution + RunModel // ACLModel is the enum for a acl model ACLModel ) -// DefaultBranchName is the default name all branch-level logbook data is read -// from and written to. we currently don't present branches as a user-facing -// feature in qri, but logbook supports them -const DefaultBranchName = "main" +const ( + // DefaultBranchName is the default name all branch-level logbook data is read + // from and written to. we currently don't present branches as a user-facing + // feature in qri, but logbook supports them + DefaultBranchName = "main" + // runIDRelPrefix is a string prefix for op.Relations when recording commit ops + // that have a non-empty Commit.RunID field + runIDRelPrefix = "runID:" +) // ModelString gets a unique string descriptor for an integral model identifier func ModelString(m uint32) string { @@ -80,6 +88,8 @@ func ModelString(m uint32) string { return "push" case ACLModel: return "acl" + case RunModel: + return "run" default: return "" } @@ -492,9 +502,13 @@ func (book *Book) WriteDatasetDelete(ctx context.Context, initID string) error { return book.save(ctx) } -// WriteVersionSave adds an operation to a log marking the creation of a -// dataset version. Book will copy details from the provided dataset pointer -func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset) error { +// WriteVersionSave adds 1 or 2 operations marking the creation of a dataset +// version. If the run.State arg is nil only one commit operation is written +// +// If a run.State argument is non-nil two operations are written to the log, +// one op for the run followed by a commit op for the dataset save. +// If run.State is non-nil the dataset.Commit.RunID and rs.ID fields must match +func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset, rs *run.State) error { if book == nil { return ErrNoLogbook } @@ -509,6 +523,13 @@ func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *datas return err } + if rs != nil { + if rs.ID != ds.Commit.RunID { + return fmt.Errorf("dataset.Commit.RunID does not match the provided run.ID") + } + book.appendTransformRun(branchLog, rs) + } + topIndex := book.appendVersionSave(branchLog, ds) // TODO(dlong): Think about how to handle a failure exactly here, what needs to be rolled back? err = book.save(ctx) @@ -531,6 +552,33 @@ func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *datas return nil } +// WriteTransformRun adds an operation to a log marking the execution of a +// dataset transform script +func (book *Book) WriteTransformRun(ctx context.Context, initID string, rs *run.State) error { + if book == nil { + return ErrNoLogbook + } + + log.Debugf("WriteTransformRun: %s", initID) + branchLog, err := book.branchLog(ctx, initID) + if err != nil { + return err + } + + if err := book.hasWriteAccess(branchLog.l); err != nil { + return err + } + + book.appendTransformRun(branchLog, rs) + // TODO(dlong): Think about how to handle a failure exactly here, what needs to be rolled back? + err = book.save(ctx) + if err != nil { + return err + } + + return nil +} + func (book *Book) appendVersionSave(blog *BranchLog, ds *dataset.Dataset) int { op := oplog.Op{ Type: oplog.OpTypeInit, @@ -545,6 +593,27 @@ func (book *Book) appendVersionSave(blog *BranchLog, ds *dataset.Dataset) int { if ds.Structure != nil { op.Size = int64(ds.Structure.Length) } + if ds.Commit.RunID != "" { + op.Relations = []string{fmt.Sprintf("%s%s", runIDRelPrefix, ds.Commit.RunID)} + } + + blog.Append(op) + + return blog.Size() - 1 +} + +// appendTransformRun maps fields from run.State to an operation. +func (book *Book) appendTransformRun(blog *BranchLog, rs *run.State) int { + op := oplog.Op{ + Type: oplog.OpTypeInit, + Model: RunModel, + Ref: rs.ID, + Name: fmt.Sprintf("%d", rs.Number), + + Timestamp: rs.StartTime.UnixNano(), + Size: int64(rs.Duration), + Note: string(rs.Status), + } blog.Append(op) @@ -1018,6 +1087,24 @@ func (book *Book) ConstructDatasetLog(ctx context.Context, ref dsref.Ref, histor return book.save(ctx) } +func commitOpRunID(op oplog.Op) string { + for _, str := range op.Relations { + if strings.HasPrefix(str, runIDRelPrefix) { + return strings.TrimPrefix(str, runIDRelPrefix) + } + } + return "" +} + +// func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo { +// return dsref.VersionInfo{ +// Username: ref.Username, +// ProfileID: ref.ProfileID, +// Name: ref.Name, +// Path: op.Ref, +// CommitTime: time.Unix(0, op.Timestamp), +// BodySize: int(op.Size), + func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo { return dsref.VersionInfo{ Username: ref.Username, @@ -1030,6 +1117,27 @@ func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo { } } +func runItemFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo { + return dsref.VersionInfo{ + Username: ref.Username, + ProfileID: ref.ProfileID, + Name: ref.Name, + CommitTime: time.Unix(0, op.Timestamp), + RunID: op.Ref, + RunStatus: op.Note, + RunDuration: int(op.Size), + // TODO(B5): read run number, defaulting to -1 in the event of an error + // RunNumber: strconv.ParseInt(op.Name), + } +} + +func addCommitDetailsToRunItem(li dsref.VersionInfo, op oplog.Op) dsref.VersionInfo { + li.CommitTitle = op.Note + li.BodySize = int(op.Size) + li.Path = op.Ref + return li +} + // Items collapses the history of a dataset branch into linear log items func (book Book) Items(ctx context.Context, ref dsref.Ref, offset, limit int) ([]dsref.VersionInfo, error) { initID, err := book.RefToInitID(dsref.Ref{Username: ref.Username, Name: ref.Name}) @@ -1060,7 +1168,17 @@ func branchToVersionInfos(blog *BranchLog, ref dsref.Ref, offset, limit int, col case CommitModel: switch op.Type { case oplog.OpTypeInit: - refs = append(refs, versionInfoFromOp(ref, op)) + // run operations & commit operations often occur next to each other in + // the log. + // if the last item in the slice has a runID that matches a runID resource + // from this commit, combine them into one Log item that describes both + // the run and the save + commitRunID := commitOpRunID(op) + if commitRunID != "" && len(refs) > 0 && commitRunID == refs[len(refs)-1].RunID { + refs[len(refs)-1] = addCommitDetailsToRunItem(refs[len(refs)-1], op) + } else { + refs = append(refs, versionInfoFromOp(ref, op)) + } case oplog.OpTypeAmend: deleteAtEnd = 0 refs[len(refs)-1] = versionInfoFromOp(ref, op) @@ -1071,6 +1189,9 @@ func branchToVersionInfos(blog *BranchLog, ref dsref.Ref, offset, limit int, col deleteAtEnd += int(op.Size) } } + case RunModel: + // runs are only ever "init" op type + refs = append(refs, runItemFromOp(ref, op)) case PushModel: switch op.Type { case oplog.OpTypeInit: diff --git a/logbook/logbook_test.go b/logbook/logbook_test.go index 4c547f79e..228a8cce9 100644 --- a/logbook/logbook_test.go +++ b/logbook/logbook_test.go @@ -84,7 +84,7 @@ func Example() { // create a log record of the version of a dataset. In practice this'll be // part of the overall save routine that created the above ds variable - if err := book.WriteVersionSave(ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(ctx, initID, ds, nil); err != nil { panic(err) } @@ -104,7 +104,7 @@ func Example() { } // once again, write to the log - if err := book.WriteVersionSave(ctx, initID, ds2); err != nil { + if err := book.WriteVersionSave(ctx, initID, ds2, nil); err != nil { panic(err) } @@ -147,7 +147,7 @@ func Example() { } // once again, write to the log - if err := book.WriteVersionSave(ctx, initID, ds3); err != nil { + if err := book.WriteVersionSave(ctx, initID, ds3, nil); err != nil { panic(err) } @@ -238,7 +238,7 @@ func TestNilCallable(t *testing.T) { if err = book.WriteVersionDelete(ctx, initID, 0); err != logbook.ErrNoLogbook { t.Errorf("expected '%s', got: %v", logbook.ErrNoLogbook, err) } - if err = book.WriteVersionSave(ctx, initID, nil); err != logbook.ErrNoLogbook { + if err = book.WriteVersionSave(ctx, initID, nil, nil); err != logbook.ErrNoLogbook { t.Errorf("expected '%s', got: %v", logbook.ErrNoLogbook, err) } if _, err = book.ResolveRef(ctx, nil); err != dsref.ErrRefNotFound { @@ -426,7 +426,7 @@ func TestWritePermissions(t *testing.T) { }, Path: "HashOfVersion1", } - if err := tr.Book.WriteVersionSave(ctx, initID, ds); !errors.Is(err, logbook.ErrAccessDenied) { + if err := tr.Book.WriteVersionSave(ctx, initID, ds, nil); !errors.Is(err, logbook.ErrAccessDenied) { t.Errorf("WriteVersionSave to an oplog the book author doesn't own must return a wrap of logbook.ErrAccessDenied") } if err := tr.Book.WriteVersionAmend(ctx, initID, ds); !errors.Is(err, logbook.ErrAccessDenied) { @@ -468,7 +468,7 @@ func TestPushModel(t *testing.T) { Title: "initial commit", }, Path: "HashOfVersion1", - }) + }, nil) if err != nil { t.Fatal(err) } @@ -639,7 +639,7 @@ func TestDatasetLogNaming(t *testing.T) { Title: "initial commit", }, Path: "HashOfVersion1", - }) + }, nil) if err != nil { t.Fatal(err) } @@ -1052,7 +1052,7 @@ func (tr *testRunner) WriteWorldBankExample(t *testing.T) string { PreviousPath: "", } - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { panic(err) } @@ -1064,7 +1064,7 @@ func (tr *testRunner) WriteWorldBankExample(t *testing.T) string { ds.Path = "QmHashOfVersion2" ds.PreviousPath = "QmHashOfVersion1" - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { t.Fatal(err) } @@ -1104,7 +1104,7 @@ func (tr *testRunner) WriteMoreWorldBankCommits(t *testing.T, initID string) { PreviousPath: "QmHashOfVersion3", } - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { panic(err) } @@ -1119,7 +1119,7 @@ func (tr *testRunner) WriteMoreWorldBankCommits(t *testing.T, initID string) { PreviousPath: "QmHashOfVersion4", } - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { panic(err) } } @@ -1156,7 +1156,7 @@ func (tr *testRunner) WriteRenameExample(t *testing.T) { PreviousPath: "", } - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { panic(err) } @@ -1165,7 +1165,7 @@ func (tr *testRunner) WriteRenameExample(t *testing.T) { ds.Path = "QmHashOfVersion2" ds.PreviousPath = "QmHashOfVersion1" - if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil { + if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil { t.Fatal(err) } @@ -1296,7 +1296,7 @@ func GenerateExampleOplog(ctx context.Context, t *testing.T, journal *logbook.Bo }, Path: headPath, PreviousPath: "", - }) + }, nil) if err != nil { t.Fatal(err) } diff --git a/logbook/logsync/logsync_test.go b/logbook/logsync/logsync_test.go index c319ec543..f977eee9b 100644 --- a/logbook/logsync/logsync_test.go +++ b/logbook/logsync/logsync_test.go @@ -488,14 +488,14 @@ func writeNasdaqLogs(ctx context.Context, book *logbook.Book) (ref dsref.Ref, er PreviousPath: "", } - if err = book.WriteVersionSave(ctx, initID, ds); err != nil { + if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil { return ref, err } ds.Path = "v1" ds.PreviousPath = "v0" - if err = book.WriteVersionSave(ctx, initID, ds); err != nil { + if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil { return ref, err } @@ -529,21 +529,21 @@ func writeWorldBankLogs(ctx context.Context, book *logbook.Book) (ref dsref.Ref, PreviousPath: "", } - if err = book.WriteVersionSave(ctx, initID, ds); err != nil { + if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil { return ref, err } ds.Path = "/ipfs/QmVersion1" ds.PreviousPath = "/ipfs/QmVesion0" - if err = book.WriteVersionSave(ctx, initID, ds); err != nil { + if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil { return ref, err } ds.Path = "/ipfs/QmVersion2" ds.PreviousPath = "/ipfs/QmVersion1" - if err = book.WriteVersionSave(ctx, initID, ds); err != nil { + if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil { return ref, err } diff --git a/logbook/temp_builder.go b/logbook/temp_builder.go index 26635bdfd..758c6a31f 100644 --- a/logbook/temp_builder.go +++ b/logbook/temp_builder.go @@ -85,7 +85,7 @@ func (b *BookBuilder) Commit(ctx context.Context, t *testing.T, initID, title, i Path: ipfsHash, PreviousPath: ref.Path, } - if err := b.Book.WriteVersionSave(ctx, initID, &ds); err != nil { + if err := b.Book.WriteVersionSave(ctx, initID, &ds, nil); err != nil { t.Fatal(err) } b.Dsrefs[ref.Name] = append(b.Dsrefs[ref.Name], ipfsHash) diff --git a/logbook/types.go b/logbook/types.go index 2642c0711..a00250cde 100644 --- a/logbook/types.go +++ b/logbook/types.go @@ -76,7 +76,7 @@ func newBranchLog(l *oplog.Log) *BranchLog { // Append adds an op to the BranchLog func (blog *BranchLog) Append(op oplog.Op) { - if op.Model != BranchModel && op.Model != CommitModel && op.Model != PushModel { + if op.Model != BranchModel && op.Model != CommitModel && op.Model != PushModel && op.Model != RunModel { log.Errorf("cannot Append, incorrect model %d for BranchLog", op.Model) return } diff --git a/remote/mock_client.go b/remote/mock_client.go index 0b3f77965..b1cf799d3 100644 --- a/remote/mock_client.go +++ b/remote/mock_client.go @@ -199,7 +199,7 @@ func (c *MockClient) createTheirDataset(ctx context.Context, ref *dsref.Ref) err ref.Path = path // Add a save operation to logbook - err = other.book.WriteVersionSave(ctx, ref.InitID, &ds) + err = other.book.WriteVersionSave(ctx, ref.InitID, &ds, nil) if err != nil { return err } diff --git a/repo/test/test_repo.go b/repo/test/test_repo.go index 1442f0bcb..9a9e34802 100644 --- a/repo/test/test_repo.go +++ b/repo/test/test_repo.go @@ -260,7 +260,7 @@ func createDataset(r repo.Repo, tc dstest.TestCase) (ref reporef.DatasetRef, err return ref, err } } - if err = r.Logbook().WriteVersionSave(ctx, initID, ds); err != nil && err != logbook.ErrNoLogbook { + if err = r.Logbook().WriteVersionSave(ctx, initID, ds, nil); err != nil && err != logbook.ErrNoLogbook { return }