Skip to content

Commit

Permalink
Merge pull request #12119 from RaduBerinde/distsql-join-minor-fixes
Browse files Browse the repository at this point in the history
distsql: small fixes (debug logs, hash join encoding problem)
  • Loading branch information
RaduBerinde committed Dec 7, 2016
2 parents 5850968 + d45ad26 commit 76d810b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/distsql/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (n *noopProcessor) Run(wg *sync.WaitGroup) {
return
}
if log.V(3) {
log.Infof(n.flowCtx.Context, "noop: pushing row %s\n", row)
log.Infof(n.flowCtx.Context, "noop: pushing row %s", row)
}
if !n.output.PushRow(row) {
return
Expand Down
32 changes: 27 additions & 5 deletions pkg/sql/distsql/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package distsql
import (
"sync"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -83,18 +85,18 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
defer log.Infof(ctx, "exiting hash joiner run")
}

if err := h.buildPhase(); err != nil {
if err := h.buildPhase(ctx); err != nil {
h.output.Close(err)
return
}
err := h.probePhase()
err := h.probePhase(ctx)
h.output.Close(err)
}

// buildPhase constructs our internal hash map of rows seen, this is done
// entirely from the right stream with the encoding/group key generated using the
// left equality columns.
func (h *hashJoiner) buildPhase() error {
func (h *hashJoiner) buildPhase(ctx context.Context) error {
var scratch []byte
for {
rrow, err := h.inputs[1].NextRow()
Expand All @@ -117,6 +119,9 @@ func (h *hashJoiner) buildPhase() error {
if err != nil {
return err
}
if log.V(3) && row != nil {
log.Infof(ctx, "pushing row %s", row)
}
if row != nil && !h.output.PushRow(row) {
return nil
}
Expand All @@ -135,7 +140,7 @@ func (h *hashJoiner) buildPhase() error {
// merging of the two rows if matched. Behaviour for outer joins is as expected,
// i.e. for RIGHT OUTER joins if no corresponding left row is seen an empty
// DNull row is emitted instead.
func (h *hashJoiner) probePhase() error {
func (h *hashJoiner) probePhase(ctx context.Context) error {
var scratch []byte
for {
lrow, err := h.inputs[0].NextRow()
Expand All @@ -160,6 +165,9 @@ func (h *hashJoiner) probePhase() error {
if err != nil {
return err
}
if log.V(3) && row != nil {
log.Infof(ctx, "pushing row %s", row)
}
if row != nil && !h.output.PushRow(row) {
return nil
}
Expand All @@ -173,6 +181,9 @@ func (h *hashJoiner) probePhase() error {
if err != nil {
return err
}
if log.V(3) && row != nil {
log.Infof(ctx, "pushing row %s", row)
}
if row != nil && !h.output.PushRow(row) {
return nil
}
Expand All @@ -184,6 +195,9 @@ func (h *hashJoiner) probePhase() error {
if err != nil {
return err
}
if log.V(3) && row != nil {
log.Infof(ctx, "pushing row %s", row)
}
if row != nil && !h.output.PushRow(row) {
return nil
}
Expand All @@ -203,6 +217,9 @@ func (h *hashJoiner) probePhase() error {
if err != nil {
return err
}
if log.V(3) && row != nil {
log.Infof(ctx, "pushing row %s", row)
}
if row != nil && !h.output.PushRow(row) {
return nil
}
Expand All @@ -223,7 +240,12 @@ func (h *hashJoiner) encode(
if row[colIdx].IsNull() {
return nil, true, nil
}
appendTo, err = row[colIdx].Encode(&h.datumAlloc, sqlbase.DatumEncoding_VALUE, appendTo)
// Note: we cannot compare VALUE encodings because they contain column IDs
// which can vary.
// TODO(radu): we should figure out what encoding is readily available and
// use that (though it needs to be consistent across all rows). We could add
// functionality to compare VALUE encodings ignoring the column ID.
appendTo, err = row[colIdx].Encode(&h.datumAlloc, sqlbase.DatumEncoding_ASCENDING_KEY, appendTo)
if err != nil {
return appendTo, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func ProcessInboundStream(
break
}
if log.V(3) {
log.Infof(ctx, "inbound stream pushing row %s\n", row)
log.Infof(ctx, "inbound stream pushing row %s", row)
}
if !dst.PushRow(row) {
// Rest of rows not needed.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (jr *joinReader) mainLoop() error {
break
}
if log.V(3) {
log.Infof(ctx, "pushing row %s\n", outRow)
log.Infof(ctx, "pushing row %s", outRow)
}
// Push the row to the output RowReceiver; stop if they don't need more
// rows.
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql/sorterstrategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (ss *sortAllStrategy) Execute(s *sorter) error {
}

if log.V(3) {
log.Infof(s.ctx, "pushing row %s\n", row)
log.Infof(s.ctx, "pushing row %s", row)
}

// Push the row to the output; stop if they don't need more rows.
Expand Down Expand Up @@ -201,7 +201,7 @@ func (ss *sortTopKStrategy) Execute(s *sorter) error {
}

if log.V(3) {
log.Infof(s.ctx, "pushing row %s\n", row)
log.Infof(s.ctx, "pushing row %s", row)
}

// Push the row to the output; stop if they don't need more rows.
Expand Down Expand Up @@ -272,7 +272,7 @@ func (ss *sortChunksStrategy) Execute(s *sorter) error {
// for the first s.matchLen ordering columns.
for {
if log.V(3) {
log.Infof(s.ctx, "pushing row %s\n", nextRow)
log.Infof(s.ctx, "pushing row %s", nextRow)
}
ss.Add(nextRow)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (tr *tableReader) Run(wg *sync.WaitGroup) {
return
}
if log.V(3) {
log.Infof(ctx, "pushing row %s\n", outRow)
log.Infof(ctx, "pushing row %s", outRow)
}
// Push the row to the output RowReceiver; stop if they don't need more
// rows.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlbase/encoded_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func (ed *EncDatum) Encoding() (DatumEncoding, bool) {

// Encode appends the encoded datum to the given slice using the requested
// encoding.
// Note: DatumEncoding_VALUE encodings are not unique because they can contain
// a column ID so they should not be used to test for equality.
func (ed *EncDatum) Encode(a *DatumAlloc, enc DatumEncoding, appendTo []byte) ([]byte, error) {
if ed.encoded != nil && enc == ed.encoding {
// We already have an encoding that matches
Expand Down

0 comments on commit 76d810b

Please sign in to comment.