Skip to content

Commit

Permalink
Create new cassandra table "operation_names_v2" with "spanKind" colum…
Browse files Browse the repository at this point in the history
…n for operation name index

- add migration script
- read from the latest table if available, otherwise fail back to previous table

Signed-off-by: Jun Guo <guo0693@gmail.com>
  • Loading branch information
guo0693 committed Nov 19, 2019
1 parent b467041 commit 75870c6
Show file tree
Hide file tree
Showing 7 changed files with 511 additions and 55 deletions.
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
assert.EqualError(t, err, "archive storage not configured")

f.archiveConfig = &mockSessionBuilder{}
f.archiveConfig = newMockSessionBuilder(session, nil)
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanReader()
Expand Down
100 changes: 100 additions & 0 deletions plugin/storage/cassandra/schema/migration/V002toV003.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env bash

# Create a new operation_names_v2 table and copy all data from operation_names table
# Sample usage: KEYSPACE=jaeger_v1_test TIMEOUT=1000 ./plugin/storage/cassandra/schema/migration/v002tov003.sh

set -euo pipefail

function usage {
>&2 echo "Error: $1"
>&2 echo ""
>&2 echo "Usage: KEYSPACE={keyspace} TTL={ttl} $0"
>&2 echo ""
>&2 echo "The following parameters can be set via environment:"
>&2 echo " KEYSPACE - keyspace"
>&2 echo ""
exit 1
}

confirm() {
read -r -p "${1:-Continue? [y/N]} " response
case "$response" in
[yY][eE][sS]|[yY])
true
;;
*)
exit 1
;;
esac
}

if [[ ${KEYSPACE} == "" ]]; then
usage "missing KEYSPACE parameter"
fi

if [[ ${KEYSPACE} =~ [^a-zA-Z0-9_] ]]; then
usage "invalid characters in KEYSPACE=$KEYSPACE parameter, please use letters, digits or underscores"
fi

keyspace=${KEYSPACE}
old_table=operation_names
new_table=operation_names_v2
cqlsh_cmd=cqlsh

row_count=$(${cqlsh_cmd} -e "select count(*) from $keyspace.$old_table;"|head -4|tail -1| tr -d ' ')

echo "About to copy $row_count rows to new table..."

confirm

${cqlsh_cmd} -e "COPY $keyspace.$old_table (service_name, operation_name) to '$old_table.csv';"

if [[ ! -f ${old_table}.csv ]]; then
echo "Could not find $old_table.csv. Backup from cassandra was probably not successful"
exit 1
fi

csv_rows=$(wc -l ${old_table}.csv | tr -dc '0-9')

if [[ ${row_count} -ne ${csv_rows} ]]; then
echo "Number of rows: $csv_rows in file is not equal to number of rows: $row_count in cassandra"
exit 1
fi

echo "Generating data for new table..."
while IFS="," read service_name operation_name; do
echo "$service_name,,$operation_name"
done < ${old_table}.csv > ${new_table}.csv

ttl=$(${cqlsh_cmd} -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='$old_table';"|head -4|tail -1|tr -d ' ')

echo "Creating new table $new_table with ttl: $ttl"

${cqlsh_cmd} -e "CREATE TABLE IF NOT EXISTS $keyspace.$new_table (
service_name text,
span_kind text,
operation_name text,
PRIMARY KEY ((service_name), span_kind, operation_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = $ttl
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800;"

echo "Import data to new table: $keyspace.$new_table from $new_table.csv"

# empty string will be inserted as empty string instead of null
${cqlsh_cmd} -e "COPY $keyspace.$new_table (service_name, span_kind, operation_name)
FROM '$new_table.csv'
WITH NULL='NIL';"

echo "Data from old table are successfully imported to new table!"

echo "Before finish, do you want to delete old table: $keyspace.$old_table?"
confirm
${cqlsh_cmd} -e "DROP TABLE IF EXISTS $keyspace.$old_table;"
204 changes: 204 additions & 0 deletions plugin/storage/cassandra/schema/v003.cql.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
--
-- Creates Cassandra keyspace with tables for traces and dependencies.
--
-- Required parameters:
--
-- keyspace
-- name of the keyspace
-- replication
-- replication strategy for the keyspace, such as
-- for prod environments
-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' }
-- for test environments
-- {'class': 'SimpleStrategy', 'replication_factor': '1'}
-- trace_ttl
-- default time to live for trace data, in seconds
-- dependencies_ttl
-- default time to live for dependencies data, in seconds (0 for no TTL)
--
-- Non-configurable settings:
-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/
-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html

CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};

CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue (
key text,
value_type text,
value_string text,
value_bool boolean,
value_long bigint,
value_double double,
value_binary blob,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.log (
ts bigint,
fields list<frozen<keyvalue>>,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref (
ref_type text,
trace_id blob,
span_id bigint,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.process (
service_name text,
tags list<frozen<keyvalue>>,
);

-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID.
-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table".
-- start_time is bigint instead of timestamp as we require microsecond precision
CREATE TABLE IF NOT EXISTS ${keyspace}.traces (
trace_id blob,
span_id bigint,
span_hash bigint,
parent_id bigint,
operation_name text,
flags int,
start_time bigint,
duration bigint,
tags list<frozen<keyvalue>>,
logs list<frozen<log>>,
refs list<frozen<span_ref>>,
process frozen<process>,
PRIMARY KEY (trace_id, span_id, span_hash)
)
WITH compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_names (
service_name text,
PRIMARY KEY (service_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 (
service_name text,
span_kind text,
operation_name text,
PRIMARY KEY ((service_name), span_kind, operation_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- index of trace IDs by service + operation names, sorted by span start_time.
CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index (
service_name text,
operation_name text,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index (
service_name text,
bucket int,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, bucket), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index (
service_name text, // service name
operation_name text, // operation name, or blank for queries without span name
bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour
duration bigint, // span duration, in microseconds
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id)
) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- a bucketing strategy may have to be added for tag queries
-- we can make this table even better by adding a timestamp to it
CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index (
service_name text,
tag_key text,
tag_value text,
start_time bigint,
trace_id blob,
span_id bigint,
PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id)
)
WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
parent text,
child text,
call_count bigint,
source text,
);

-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 (
ts_bucket timestamp,
ts timestamp,
dependencies list<frozen<dependency>>,
PRIMARY KEY (ts_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${dependencies_ttl};
Loading

0 comments on commit 75870c6

Please sign in to comment.