Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update storage api to support query operations by spanKind #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,20 +415,29 @@ func TestGetServicesFailureGRPC(t *testing.T) {
func TestGetOperationsSuccessGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {

expectedOperations := []string{"", "get"}
server.spanReader.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()
expectedOperations := []*spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}}
server.spanReader.On("GetOperations",
mock.AnythingOfType("*context.valueCtx"),
&spanstore.OperationQueryParameters{ServiceName: "abc/trifle"},
).Return(expectedOperations, nil).Once()

res, err := client.GetOperations(context.Background(), &api_v2.GetOperationsRequest{
Service: "abc/trifle",
})
assert.NoError(t, err)
assert.Equal(t, expectedOperations, res.Operations)
assert.Equal(t, len(expectedOperations), len(res.Operations))
for idx, actualOp := range res.Operations {
assert.Equal(t, expectedOperations[idx].Name, actualOp)
}
})
}

func TestGetOperationsFailureGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {
server.spanReader.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "trifle").Return(nil, errStorageGRPC).Once()
server.spanReader.On("GetOperations",
mock.AnythingOfType("*context.valueCtx"),
&spanstore.OperationQueryParameters{ServiceName: "trifle"},
).Return(nil, errStorageGRPC).Once()

_, err := client.GetOperations(context.Background(), &api_v2.GetOperationsRequest{
Service: "trifle",
Expand Down
39 changes: 26 additions & 13 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
testHttp "github.com/stretchr/testify/http"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
jaeger "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -483,17 +483,17 @@ func TestGetServicesStorageFailure(t *testing.T) {
func TestGetOperationsSuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
expectedOperations := []string{"", "get"}
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()
expectedOperations := []*spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}}
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"),
&spanstore.OperationQueryParameters{ServiceName: "abc/trifle"}).Return(expectedOperations, nil).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/operations?service=abc%2Ftrifle", &response)
assert.NoError(t, err)
actualOperations := make([]string, len(expectedOperations))
for i, s := range response.Data.([]interface{}) {
actualOperations[i] = s.(string)
assert.Equal(t, expectedOperations[i].Name, s.(string))
}
assert.Equal(t, expectedOperations, actualOperations)

}

func TestGetOperationsNoServiceName(t *testing.T) {
Expand All @@ -506,9 +506,12 @@ func TestGetOperationsNoServiceName(t *testing.T) {
}

func TestGetOperationsStorageFailure(t *testing.T) {
server, reaMock, _ := initializeTestServer()
server, readMock, _ := initializeTestServer()
defer server.Close()
reaMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "trifle").Return(nil, errStorage).Once()
readMock.On(
"GetOperations",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("*spanstore.OperationQueryParameters")).Return(nil, errStorage).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/operations?service=trifle", &response)
Expand All @@ -518,8 +521,16 @@ func TestGetOperationsStorageFailure(t *testing.T) {
func TestGetOperationsLegacySuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
expectedOperations := []string{"", "get"}
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()
expectedOperationNames := []string{"", "get"}
expectedOperations := []*spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}}
readMock.On("GetOperations",
mock.AnythingOfType("*context.valueCtx"),
"abc/trifle",
).Return(expectedOperationNames, nil).Once()
readMock.On(
"GetOperations",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("*spanstore.OperationQueryParameters")).Return(expectedOperations, nil).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/services/abc%2Ftrifle/operations", &response)
Expand All @@ -528,14 +539,16 @@ func TestGetOperationsLegacySuccess(t *testing.T) {
for i, s := range response.Data.([]interface{}) {
actualOperations[i] = s.(string)
}
assert.Equal(t, expectedOperations, actualOperations)
assert.Equal(t, expectedOperationNames, actualOperations)
}

func TestGetOperationsLegacyStorageFailure(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "trifle").Return(nil, errStorage).Once()

readMock.On(
"GetOperations",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("*spanstore.OperationQueryParameters")).Return(nil, errStorage).Once()
var response structuredResponse
err := getJSON(server.URL+"/api/services/trifle/operations", &response)
assert.Error(t, err)
Expand Down
13 changes: 12 additions & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,18 @@ func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {

// GetOperations is the queryService implementation of spanstore.Reader.GetOperations
func (qs QueryService) GetOperations(ctx context.Context, service string) ([]string, error) {
return qs.spanReader.GetOperations(ctx, service)
operations, err := qs.spanReader.GetOperations(ctx, &spanstore.OperationQueryParameters{
guo0693 marked this conversation as resolved.
Show resolved Hide resolved
ServiceName: service,
})
if err != nil {
return nil, err
}

names := make([]string, len(operations))
for idx, operation := range operations {
names[idx] = operation.Name
}
return names, err
}

// FindTraces is the queryService implementation of spanstore.Reader.FindTraces
Expand Down
9 changes: 6 additions & 3 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,17 @@ func TestGetServices(t *testing.T) {
// Test QueryService.GetOperations() for success.
func TestGetOperations(t *testing.T) {
qs, readMock, _ := initializeTestService()
expectedOperations := []string{"", "get"}
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()
expectedOperationNames := []string{"", "get"}
expectedOperations := []*spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}}
readMock.On("GetOperations",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("*spanstore.OperationQueryParameters")).Return(expectedOperations, nil).Once()

type contextKey string
ctx := context.Background()
actualOperations, err := qs.GetOperations(context.WithValue(ctx, contextKey("foo"), "bar"), "abc/trifle")
assert.NoError(t, err)
assert.Equal(t, expectedOperations, actualOperations)
assert.Equal(t, expectedOperationNames, actualOperations)
}

// Test QueryService.FindTraces() for success.
Expand Down
17 changes: 13 additions & 4 deletions plugin/storage/badger/spanstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

"github.com/dgraph-io/badger"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

// CacheStore saves expensive calculations from the K/V store
Expand Down Expand Up @@ -125,8 +127,8 @@ func (c *CacheStore) Update(service, operation string, expireTime uint64) {
c.cacheLock.Unlock()
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (c *CacheStore) GetOperations(service string) ([]string, error) {
// GetOperations returns all operations for a specific service & spanKind traced by Jaeger
func (c *CacheStore) GetOperations(service string) ([]*spanstore.Operation, error) {
operations := make([]string, 0, len(c.services))
t := uint64(time.Now().Unix())
c.cacheLock.Lock()
Expand All @@ -137,7 +139,7 @@ func (c *CacheStore) GetOperations(service string) ([]string, error) {
// Expired, remove
delete(c.services, service)
delete(c.operations, service)
return operations, nil // empty slice rather than nil
return []*spanstore.Operation{}, nil // empty slice rather than nil
}
for o, e := range c.operations[service] {
if e > t {
Expand All @@ -150,7 +152,14 @@ func (c *CacheStore) GetOperations(service string) ([]string, error) {

sort.Strings(operations)

return operations, nil
//TODO: return the operations with actual spanKind
result := make([]*spanstore.Operation, 0, len(operations))
for _, op := range operations {
result = append(result, &spanstore.Operation{
Name: op,
})
}
return result, nil
}

// GetServices returns all services traced by Jaeger
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/badger/spanstore/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ func TestMenuSeeks(t *testing.T) {
}
}

operations, err := sr.GetOperations(context.Background(), "service-1")
operations, err := sr.GetOperations(context.Background(),
&spanstore.OperationQueryParameters{ServiceName: "service-1", SpanKind: ""})
assert.NoError(t, err)

serviceList, err := sr.GetServices(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ func (r *TraceReader) GetServices(ctx context.Context) ([]string, error) {
}

// GetOperations fetches operations in the service and empty slice if service does not exists
func (r *TraceReader) GetOperations(ctx context.Context, service string) ([]string, error) {
return r.cache.GetOperations(service)
func (r *TraceReader) GetOperations(ctx context.Context, query *spanstore.OperationQueryParameters) (
[]*spanstore.Operation, error) {
return r.cache.GetOperations(query.ServiceName)
}

// setQueryDefaults alters the query with defaults if certain parameters are not set
Expand Down
24 changes: 7 additions & 17 deletions plugin/storage/cassandra/spanstore/operation_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type tableMeta struct {
queryByKindStmt string
queryStmt string
createWriteQuery func(query cassandra.Query, service, kind, opName string) cassandra.Query
getOperations func(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error)
getOperations func(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) (
[]*spanstore.Operation, error)
}

func (t *tableMeta) materialize() {
Expand Down Expand Up @@ -126,10 +127,8 @@ func NewOperationNamesStorage(
}

// Write saves Operation and Service name tuples
func (s *OperationNamesStorage) Write(serviceName string, operationName string) error {
func (s *OperationNamesStorage) Write(serviceName, operationName, spanKind string) error {
var err error
//TODO: take spanKind from args
spanKind := ""

if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache {
q := s.table.createWriteQuery(s.session.Query(s.table.insertStmt), serviceName, spanKind, operationName)
Expand All @@ -142,20 +141,11 @@ func (s *OperationNamesStorage) Write(serviceName string, operationName string)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) {
operations, err := s.table.getOperations(s, &spanstore.OperationQueryParameters{
ServiceName: service,
func (s *OperationNamesStorage) GetOperations(query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) {
return s.table.getOperations(s, &spanstore.OperationQueryParameters{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
})

if err != nil {
return nil, err
}
//TODO: return operations instead of list of string
operationNames := make([]string, len(operations))
for idx, operation := range operations {
operationNames[idx] = operation.Name
}
return operationNames, err
}

func tableExist(session cassandra.Session, tableName string) bool {
Expand Down
30 changes: 21 additions & 9 deletions plugin/storage/cassandra/spanstore/operation_names_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type operationNameStorageTest struct {
Expand All @@ -49,6 +50,9 @@ func withOperationNamesStorage(writeCacheTTL time.Duration,
query := &mocks.Query{}
session.On("Query",
fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query)
session.On("Query",
fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName),
mock.Anything).Return(query)
if schemaVersion != latestVersion {
query.On("Exec").Return(errors.New("new table does not exist"))
} else {
Expand Down Expand Up @@ -98,12 +102,14 @@ func TestOperationNamesStorageWrite(t *testing.T) {

s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)

err := s.storage.Write("service-a", "Operation-b")
err := s.storage.Write("service-a", "Operation-b", "")
assert.NoError(t, err)

err = s.storage.Write("service-c", "operation-d")
err = s.storage.Write("service-c", "operation-d", "")
assert.EqualError(t, err,
"failed to Exec query 'select from "+schemas[test.schemaVersion].tableName+"': exec error")
"failed to Exec query 'select from "+
schemas[test.schemaVersion].tableName+
"': exec error")
assert.Equal(t, map[string]string{
"level": "error",
"msg": "Failed to exec query",
Expand All @@ -119,7 +125,7 @@ func TestOperationNamesStorageWrite(t *testing.T) {
}, counts, "after first two writes")

// write again
err = s.storage.Write("service-a", "Operation-b")
err = s.storage.Write("service-a", "Operation-b", "")
assert.NoError(t, err)

counts2, _ := s.metricsFactory.Snapshot()
Expand Down Expand Up @@ -168,16 +174,22 @@ func TestOperationNamesStorageGetServices(t *testing.T) {
query.On("Iter").Return(iter)

s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
services, err := s.storage.GetOperations("service-a")
services, err := s.storage.GetOperations(&spanstore.OperationQueryParameters{
ServiceName: "service-a",
})
if test.expErr == nil {
assert.NoError(t, err)
// expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder`
assert.Equal(t, []string{""}, services)
// expect one empty operation result
// because mock iter.Scan(&placeholder) does not write to `placeholder`
assert.Equal(t, []*spanstore.Operation{{}}, services)
} else {
assert.EqualError(t, err,
assert.EqualError(
t,
err,
fmt.Sprintf("Error reading %s from storage: %s",
schemas[test.schemaVersion].tableName,
test.expErr.Error()))
test.expErr.Error()),
)
}
})
})
Expand Down
7 changes: 4 additions & 3 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (

type serviceNamesReader func() ([]string, error)

type operationNamesReader func(service string) ([]string, error)
type operationNamesReader func(query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error)

type spanReaderMetrics struct {
readTraces *casMetrics.Table
Expand Down Expand Up @@ -143,8 +143,9 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
return s.operationNamesReader(service)
func (s *SpanReader) GetOperations(ctx context.Context, query *spanstore.OperationQueryParameters) (
[]*spanstore.Operation, error) {
return s.operationNamesReader(query)
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
Expand Down
Loading