Skip to content

Commit

Permalink
feat(logbook): add methods for writing transform run ops
Browse files Browse the repository at this point in the history
Introduce a new type of branch-level operation to logbook that records details
about a transform run. This commit adds two ways a Run-model operation can get
into logbook. The first is via a new *run.State arg to `WriteVersionSave`:

  WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset, rs *run.State) error

Passing a non-nil run.State writes two operations that are connected by
referencing the same RunID. When read back out into a dataset log these
operations are combined into a single DatasetLogItem, which has new fields for
describing runs.

The second way to add a Run operation to a branch log is via `WriteTransformRun`

  WriteTransformRun(ctx context.Context, initID string, rs *run.State) error

This is intended for where we'd like to record a run that didn't create a commit
In this case we record a run op, but no commit op. The run op will contain
details about the failure.

All of this needs tests.
  • Loading branch information
b5 committed Feb 17, 2021
1 parent 32dfe7d commit 7d0cb91
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 35 deletions.
4 changes: 0 additions & 4 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ func (b *bus) publish(ctx context.Context, typ Type, sessionID string, payload i
Payload: payload,
}

if b.closed {
return e, ErrBusClosed
}

// TODO(dustmop): Add instrumentation, perhaps to ctx, to make logging / tracing
// a single event easier to do.

Expand Down
137 changes: 129 additions & 8 deletions logbook/logbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/logbook/oplog"
"github.com/qri-io/qri/profile"
"github.com/qri-io/qri/transform/run"
)

var (
Expand Down Expand Up @@ -56,14 +57,21 @@ const (
CommitModel
// PushModel is the enum for a push model
PushModel
// RunModel is the enum for transform execution
RunModel
// ACLModel is the enum for a acl model
ACLModel
)

// DefaultBranchName is the default name all branch-level logbook data is read
// from and written to. we currently don't present branches as a user-facing
// feature in qri, but logbook supports them
const DefaultBranchName = "main"
const (
// DefaultBranchName is the default name all branch-level logbook data is read
// from and written to. we currently don't present branches as a user-facing
// feature in qri, but logbook supports them
DefaultBranchName = "main"
// runIDRelPrefix is a string prefix for op.Relations when recording commit ops
// that have a non-empty Commit.RunID field
runIDRelPrefix = "runID:"
)

// ModelString gets a unique string descriptor for an integral model identifier
func ModelString(m uint32) string {
Expand All @@ -80,6 +88,8 @@ func ModelString(m uint32) string {
return "push"
case ACLModel:
return "acl"
case RunModel:
return "run"
default:
return ""
}
Expand Down Expand Up @@ -492,9 +502,13 @@ func (book *Book) WriteDatasetDelete(ctx context.Context, initID string) error {
return book.save(ctx)
}

// WriteVersionSave adds an operation to a log marking the creation of a
// dataset version. Book will copy details from the provided dataset pointer
func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset) error {
// WriteVersionSave adds 1 or 2 operations marking the creation of a dataset
// version. If the run.State arg is nil only one commit operation is written
//
// If a run.State argument is non-nil two operations are written to the log,
// one op for the run followed by a commit op for the dataset save.
// If run.State is non-nil the dataset.Commit.RunID and rs.ID fields must match
func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *dataset.Dataset, rs *run.State) error {
if book == nil {
return ErrNoLogbook
}
Expand All @@ -509,6 +523,13 @@ func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *datas
return err
}

if rs != nil {
if rs.ID != ds.Commit.RunID {
return fmt.Errorf("dataset.Commit.RunID does not match the provided run.ID")
}
book.appendTransformRun(branchLog, rs)
}

topIndex := book.appendVersionSave(branchLog, ds)
// TODO(dlong): Think about how to handle a failure exactly here, what needs to be rolled back?
err = book.save(ctx)
Expand All @@ -531,6 +552,33 @@ func (book *Book) WriteVersionSave(ctx context.Context, initID string, ds *datas
return nil
}

// WriteTransformRun adds an operation to a log marking the execution of a
// dataset transform script
func (book *Book) WriteTransformRun(ctx context.Context, initID string, rs *run.State) error {
if book == nil {
return ErrNoLogbook
}

log.Debugf("WriteTransformRun: %s", initID)
branchLog, err := book.branchLog(ctx, initID)
if err != nil {
return err
}

if err := book.hasWriteAccess(branchLog.l); err != nil {
return err
}

book.appendTransformRun(branchLog, rs)
// TODO(dlong): Think about how to handle a failure exactly here, what needs to be rolled back?
err = book.save(ctx)
if err != nil {
return err
}

return nil
}

func (book *Book) appendVersionSave(blog *BranchLog, ds *dataset.Dataset) int {
op := oplog.Op{
Type: oplog.OpTypeInit,
Expand All @@ -545,6 +593,27 @@ func (book *Book) appendVersionSave(blog *BranchLog, ds *dataset.Dataset) int {
if ds.Structure != nil {
op.Size = int64(ds.Structure.Length)
}
if ds.Commit.RunID != "" {
op.Relations = []string{fmt.Sprintf("%s%s", runIDRelPrefix, ds.Commit.RunID)}
}

blog.Append(op)

return blog.Size() - 1
}

// appendTransformRun maps fields from run.State to an operation.
func (book *Book) appendTransformRun(blog *BranchLog, rs *run.State) int {
op := oplog.Op{
Type: oplog.OpTypeInit,
Model: RunModel,
Ref: rs.ID,
Name: fmt.Sprintf("%d", rs.Number),

Timestamp: rs.StartTime.UnixNano(),
Size: int64(rs.Duration),
Note: string(rs.Status),
}

blog.Append(op)

Expand Down Expand Up @@ -1018,6 +1087,24 @@ func (book *Book) ConstructDatasetLog(ctx context.Context, ref dsref.Ref, histor
return book.save(ctx)
}

func commitOpRunID(op oplog.Op) string {
for _, str := range op.Relations {
if strings.HasPrefix(str, runIDRelPrefix) {
return strings.TrimPrefix(str, runIDRelPrefix)
}
}
return ""
}

// func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo {
// return dsref.VersionInfo{
// Username: ref.Username,
// ProfileID: ref.ProfileID,
// Name: ref.Name,
// Path: op.Ref,
// CommitTime: time.Unix(0, op.Timestamp),
// BodySize: int(op.Size),

func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo {
return dsref.VersionInfo{
Username: ref.Username,
Expand All @@ -1030,6 +1117,27 @@ func versionInfoFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo {
}
}

func runItemFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo {
return dsref.VersionInfo{
Username: ref.Username,
ProfileID: ref.ProfileID,
Name: ref.Name,
CommitTime: time.Unix(0, op.Timestamp),
RunID: op.Ref,
RunStatus: op.Note,
RunDuration: int(op.Size),
// TODO(B5): read run number, defaulting to -1 in the event of an error
// RunNumber: strconv.ParseInt(op.Name),
}
}

func addCommitDetailsToRunItem(li dsref.VersionInfo, op oplog.Op) dsref.VersionInfo {
li.CommitTitle = op.Note
li.BodySize = int(op.Size)
li.Path = op.Ref
return li
}

// Items collapses the history of a dataset branch into linear log items
func (book Book) Items(ctx context.Context, ref dsref.Ref, offset, limit int) ([]dsref.VersionInfo, error) {
initID, err := book.RefToInitID(dsref.Ref{Username: ref.Username, Name: ref.Name})
Expand Down Expand Up @@ -1060,7 +1168,17 @@ func branchToVersionInfos(blog *BranchLog, ref dsref.Ref, offset, limit int, col
case CommitModel:
switch op.Type {
case oplog.OpTypeInit:
refs = append(refs, versionInfoFromOp(ref, op))
// run operations & commit operations often occur next to each other in
// the log.
// if the last item in the slice has a runID that matches a runID resource
// from this commit, combine them into one Log item that describes both
// the run and the save
commitRunID := commitOpRunID(op)
if commitRunID != "" && len(refs) > 0 && commitRunID == refs[len(refs)-1].RunID {
refs[len(refs)-1] = addCommitDetailsToRunItem(refs[len(refs)-1], op)
} else {
refs = append(refs, versionInfoFromOp(ref, op))
}
case oplog.OpTypeAmend:
deleteAtEnd = 0
refs[len(refs)-1] = versionInfoFromOp(ref, op)
Expand All @@ -1071,6 +1189,9 @@ func branchToVersionInfos(blog *BranchLog, ref dsref.Ref, offset, limit int, col
deleteAtEnd += int(op.Size)
}
}
case RunModel:
// runs are only ever "init" op type
refs = append(refs, runItemFromOp(ref, op))
case PushModel:
switch op.Type {
case oplog.OpTypeInit:
Expand Down
28 changes: 14 additions & 14 deletions logbook/logbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Example() {

// create a log record of the version of a dataset. In practice this'll be
// part of the overall save routine that created the above ds variable
if err := book.WriteVersionSave(ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
panic(err)
}

Expand All @@ -104,7 +104,7 @@ func Example() {
}

// once again, write to the log
if err := book.WriteVersionSave(ctx, initID, ds2); err != nil {
if err := book.WriteVersionSave(ctx, initID, ds2, nil); err != nil {
panic(err)
}

Expand Down Expand Up @@ -147,7 +147,7 @@ func Example() {
}

// once again, write to the log
if err := book.WriteVersionSave(ctx, initID, ds3); err != nil {
if err := book.WriteVersionSave(ctx, initID, ds3, nil); err != nil {
panic(err)
}

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestNilCallable(t *testing.T) {
if err = book.WriteVersionDelete(ctx, initID, 0); err != logbook.ErrNoLogbook {
t.Errorf("expected '%s', got: %v", logbook.ErrNoLogbook, err)
}
if err = book.WriteVersionSave(ctx, initID, nil); err != logbook.ErrNoLogbook {
if err = book.WriteVersionSave(ctx, initID, nil, nil); err != logbook.ErrNoLogbook {
t.Errorf("expected '%s', got: %v", logbook.ErrNoLogbook, err)
}
if _, err = book.ResolveRef(ctx, nil); err != dsref.ErrRefNotFound {
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestWritePermissions(t *testing.T) {
},
Path: "HashOfVersion1",
}
if err := tr.Book.WriteVersionSave(ctx, initID, ds); !errors.Is(err, logbook.ErrAccessDenied) {
if err := tr.Book.WriteVersionSave(ctx, initID, ds, nil); !errors.Is(err, logbook.ErrAccessDenied) {
t.Errorf("WriteVersionSave to an oplog the book author doesn't own must return a wrap of logbook.ErrAccessDenied")
}
if err := tr.Book.WriteVersionAmend(ctx, initID, ds); !errors.Is(err, logbook.ErrAccessDenied) {
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestPushModel(t *testing.T) {
Title: "initial commit",
},
Path: "HashOfVersion1",
})
}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestDatasetLogNaming(t *testing.T) {
Title: "initial commit",
},
Path: "HashOfVersion1",
})
}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func (tr *testRunner) WriteWorldBankExample(t *testing.T) string {
PreviousPath: "",
}

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
panic(err)
}

Expand All @@ -1064,7 +1064,7 @@ func (tr *testRunner) WriteWorldBankExample(t *testing.T) string {
ds.Path = "QmHashOfVersion2"
ds.PreviousPath = "QmHashOfVersion1"

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1104,7 +1104,7 @@ func (tr *testRunner) WriteMoreWorldBankCommits(t *testing.T, initID string) {
PreviousPath: "QmHashOfVersion3",
}

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
panic(err)
}

Expand All @@ -1119,7 +1119,7 @@ func (tr *testRunner) WriteMoreWorldBankCommits(t *testing.T, initID string) {
PreviousPath: "QmHashOfVersion4",
}

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -1156,7 +1156,7 @@ func (tr *testRunner) WriteRenameExample(t *testing.T) {
PreviousPath: "",
}

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
panic(err)
}

Expand All @@ -1165,7 +1165,7 @@ func (tr *testRunner) WriteRenameExample(t *testing.T) {
ds.Path = "QmHashOfVersion2"
ds.PreviousPath = "QmHashOfVersion1"

if err := book.WriteVersionSave(tr.Ctx, initID, ds); err != nil {
if err := book.WriteVersionSave(tr.Ctx, initID, ds, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1296,7 +1296,7 @@ func GenerateExampleOplog(ctx context.Context, t *testing.T, journal *logbook.Bo
},
Path: headPath,
PreviousPath: "",
})
}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 5 additions & 5 deletions logbook/logsync/logsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,14 @@ func writeNasdaqLogs(ctx context.Context, book *logbook.Book) (ref dsref.Ref, er
PreviousPath: "",
}

if err = book.WriteVersionSave(ctx, initID, ds); err != nil {
if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
return ref, err
}

ds.Path = "v1"
ds.PreviousPath = "v0"

if err = book.WriteVersionSave(ctx, initID, ds); err != nil {
if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
return ref, err
}

Expand Down Expand Up @@ -529,21 +529,21 @@ func writeWorldBankLogs(ctx context.Context, book *logbook.Book) (ref dsref.Ref,
PreviousPath: "",
}

if err = book.WriteVersionSave(ctx, initID, ds); err != nil {
if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
return ref, err
}

ds.Path = "/ipfs/QmVersion1"
ds.PreviousPath = "/ipfs/QmVesion0"

if err = book.WriteVersionSave(ctx, initID, ds); err != nil {
if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
return ref, err
}

ds.Path = "/ipfs/QmVersion2"
ds.PreviousPath = "/ipfs/QmVersion1"

if err = book.WriteVersionSave(ctx, initID, ds); err != nil {
if err = book.WriteVersionSave(ctx, initID, ds, nil); err != nil {
return ref, err
}

Expand Down
Loading

0 comments on commit 7d0cb91

Please sign in to comment.