Skip to content

Commit

Permalink
feat(save): recall transform on empty --apply, write run operations
Browse files Browse the repository at this point in the history
On save, transform components are recalled from history if no transform
component is provided to save. This used to be accomplished with a "recall: tf"
param, but I've kept it as a transform-specific feature this go-round. I think
it ties in a little better with the new --apply semantics.

When saving with a trasnform we now build up a *run.State with the RunID event
subscription. I'd initally experimented with designs that had runState built and
returned within `transform.Apply`, but found it too easy to create a data race
when transform.Apply's wait argument is false. In that case transform.Apply
doesn't have a clear moment of when to return a run.State value without giving
back a channel, or adding mutex locks & accessor methods to run.State. Seems
it's easier to just build run.State from an external event listener.

This commit introduces automation failure logging, but creates a new issue in
the process: When a user "manually" invokes save with the apply flag and the
transform fails, a record of that failure will be recorded, which might feel
counterintuitive. The problem here is we're not passing any details to save that
lets us disambiguate automation-driven `save` calls from user-driven ones.
  • Loading branch information
b5 committed Feb 17, 2021
1 parent 7d0cb91 commit 3949be9
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 28 deletions.
29 changes: 17 additions & 12 deletions cmd/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,34 @@ func NewSaveCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command {
Use: "save [DATASET]",
Aliases: []string{"commit"},
Short: "save changes to a dataset",
Long: `Save is how you change a dataset, updating one or more of data, metadata, and structure.
You can also update your data via url. Every time you run save, an entry is added to
your dataset’s log (which you can see by running ` + "`qri log <dataset_reference>`" + `).
Long: `
Save is how you change a dataset, updating one or more dataset components. Every
time you run save, an entry is added to your dataset’s log (which you can see by
running `[1:] + "`qri log <dataset_reference>`" + `).
If the dataset you're changing has defined a transform, running ` + "`qri save`" + `
will re execute the transform. To only re-run the transform, run save with no args.
Dataset changes can be automated with a transform component adn the --apply flag
For more on transforms see https://qri.io/docs/transforms/overview
If the dataset you're changing has a transform, running ` + "`qri save --apply`" +
`
will re-execute it to produce a new version
Every time you save, you can provide a message about what you changed and why.
If you don’t provide a message Qri will automatically generate one for you.
When you make an update and save a dataset that you originally added from a different
peer, the dataset gets renamed from ` + "`peers_name/dataset_name`" + ` to ` + "`my_name/dataset_name`" + `.
The ` + "`--message`" + `" and ` + "`--title`" + ` flags allow you to add a
commit message and title to the save.`,
commit message and title to the save.
When you make an update and save a dataset that you originally added from a
different peer, the dataset gets renamed from ` + "`peers_name/dataset_name`" +
` to
` + "`my_name/dataset_name`" + `.`,
Example: ` # Save updated data to dataset annual_pop:
$ qri save --body /path/to/data.csv me/annual_pop
# Save updated dataset (no data) to annual_pop:
$ qri save --file /path/to/dataset.yaml me/annual_pop
# Re-execute a dataset that has a transform:
$ qri save me/tf_dataset`,
# Re-execute the latest transform from history:
$ qri save --apply me/tf_dataset`,
Annotations: map[string]string{
"group": "dataset",
},
Expand Down
2 changes: 1 addition & 1 deletion dsref/spec/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func GenerateExampleOplog(ctx context.Context, journal *logbook.Book, dsname, he
},
Path: headPath,
PreviousPath: "",
})
}, nil)
if err != nil {
return "", nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions event/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ const (
ETTransformStop = Type("tf:Stop")

// ETTransformStepStart signals a step is starting.
// Payload will be a StepDetail
// Payload will be a TransformStepLifecycle
ETTransformStepStart = Type("tf:StepStart")
// ETTransformStepStop signals a step has stopped.
// Payload will be a StepDetail
// Payload will be a TransformStepLifecycle
ETTransformStepStop = Type("tf:StepStop")
// ETTransformStepSkip signals a step was skipped.
// Payload will be a StepDetail
// Payload will be a TransformStepLifecycle
ETTransformStepSkip = Type("tf:StepSkip")

// ETTransformPrint is sent by print commands.
Expand Down
55 changes: 50 additions & 5 deletions lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
"github.com/qri-io/qri/transform"
"github.com/qri-io/qri/transform/run"
)

// DatasetMethods encapsulates business logic for working with Datasets on Qri
Expand Down Expand Up @@ -835,7 +836,9 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data
}
}

if !p.Force && p.Drop == "" &&
if !p.Force &&
!p.Apply &&
p.Drop == "" &&
ds.BodyPath == "" &&
ds.Body == nil &&
ds.BodyBytes == nil &&
Expand All @@ -852,25 +855,42 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data
return nil, err
}

// runState holds the results of transform application. will be non-nil if a
// transform is applied while saving
var runState *run.State

// If applying a transform, execute its script before saving
if p.Apply {
if ds.Transform == nil {
return nil, fmt.Errorf("cannot apply while saving without a transform")
// if no transform component exists, load the latest transform component
// from history
if isNew {
return nil, fmt.Errorf("cannot apply while saving without a transform")
}

prevTransformDataset, err := base.LoadRevs(ctx, m.inst.qfs, ref, []*dsref.Rev{{Field: "tf", Gen: 1}})
if err != nil {
return nil, fmt.Errorf("loading transform component from history: %w", err)
}
ds.Transform = prevTransformDataset.Transform
}

str := m.inst.node.LocalStreams
scriptOut := p.ScriptOutput
secrets := p.Secrets

// allocate an ID for the transform, subscribe to print output & build up
// runState
runID := transform.NewRunID()
runState = run.NewState(runID)
// create a loader so transforms can call `load_dataset`
// TODO(b5) - add a ResolverMode save parameter and call m.inst.resolverForMode
// on the passed in mode string instead of just using the default resolver
// cmd can then define "remote" and "offline" flags, that set the ResolverMode
// string and control how transform functions
loader := NewParseResolveLoadFunc("", m.inst.defaultResolver(), m.inst)

// allocate an ID for the transform, for now just log the events it produces
runID := transform.NewRunID()
m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
runState.AddTransformEvent(e)
if e.Type == event.ETTransformPrint {
if msg, ok := e.Payload.(event.TransformMessage); ok {
if p.ScriptOutput != nil {
Expand All @@ -886,8 +906,17 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data
shouldWait := true
err := transform.Apply(ctx, ds, loader, runID, m.inst.bus, shouldWait, str, scriptOut, secrets)
if err != nil {
log.Errorw("transform run error", "err", err.Error())
runState.Message = err.Error()
if err := m.inst.logbook.WriteTransformRun(ctx, ref.InitID, runState); err != nil {
log.Debugw("writing errored transform run to logbook:", "err", err.Error())
return nil, err
}

return nil, err
}

ds.Commit.RunID = runID
}

if fsiPath != "" && p.Drop != "" {
Expand All @@ -911,10 +940,26 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data
}
savedDs, err := base.SaveDataset(ctx, m.inst.repo, writeDest, ref.InitID, ref.Path, ds, switches)
if err != nil {
// datasets that are unchanged & have a runState record a record of no-changes
// to logbook
if errors.Is(err, dsfs.ErrNoChanges) && runState != nil {
runState.Status = run.RSUnchanged
runState.Message = err.Error()
if err := m.inst.logbook.WriteTransformRun(ctx, ref.InitID, runState); err != nil {
log.Debugw("writing unchanged transform run to logbook:", "err", err.Error())
return nil, err
}
}

log.Debugf("create ds error: %s\n", err.Error())
return nil, err
}

// Write the save to logbook
if err = m.inst.logbook.WriteVersionSave(ctx, ref.InitID, savedDs, runState); err != nil {
return nil, err
}

success = true

// TODO (b5) - this should be integrated into base.SaveDataset
Expand Down
2 changes: 1 addition & 1 deletion logbook/logbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func runItemFromOp(ref dsref.Ref, op oplog.Op) dsref.VersionInfo {
CommitTime: time.Unix(0, op.Timestamp),
RunID: op.Ref,
RunStatus: op.Note,
RunDuration: int(op.Size),
RunDuration: int64(op.Size),
// TODO(B5): read run number, defaulting to -1 in the event of an error
// RunNumber: strconv.ParseInt(op.Name),
}
Expand Down
11 changes: 5 additions & 6 deletions transform/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"
"io"

"github.com/google/uuid"
golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset"
"github.com/qri-io/ioes"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/transform/run"
"github.com/qri-io/qri/transform/startf"
)

Expand All @@ -38,6 +38,10 @@ const (
StatusSkipped = "skipped"
)

// NewRunID aliases the run identifier creation function to avoid requiring the
// run package to invoke Apply
var NewRunID = run.NewID

// Apply applies the transform script to order to modify the changing dataset
func Apply(
ctx context.Context,
Expand Down Expand Up @@ -223,8 +227,3 @@ func Apply(
err = <-doneCh
return err
}

// NewRunID creates a run identifier
func NewRunID() string {
return uuid.New().String()
}

0 comments on commit 3949be9

Please sign in to comment.