Skip to content

Commit

Permalink
feat: add API to set time unit in a metric (#36)
Browse files Browse the repository at this point in the history
* feat: add API to set time unit in a metric

This will allow reusing the same client for writing data to different tables with different time units.

* fix: upgrade deps in rebar.lock to match rebar.config
  • Loading branch information
SergeTupchiy committed Jan 26, 2024
1 parent a2f5de8 commit 519b7c2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 55 deletions.
20 changes: 10 additions & 10 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
{"1.2.0",
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},1},
{<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.13.0">>},1},
{<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.15.1">>},1},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},1},
{<<"ecpool">>,
{git,"https://github.com/emqx/ecpool",
{ref,"d01b8cb99af90bc177eeeabe29075133db878fb3"}},
{ref,"a9719f2d4ae9c778a8ca59f1ec9f644bcf7874a7"}},
0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},1},
{<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.16.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2}]}.
{<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2}]}.
[
{pkg_hash,[
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"chatterbox">>, <<"6F059D97BCAA758B8EA6FFFE2B3B81362BD06B639D3EA2BB088335511D691EBF">>},
{<<"chatterbox">>, <<"5CAC4D15DD7AD61FC3C4415CE4826FC563D4643DEE897A558EC4EA0B1C835C9C">>},
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"grpcbox">>, <<"B83F37C62D6EECA347B77F9B1EC7E9F62231690CDFEB3A31BE07CD4002BA9C82">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]},
{<<"grpcbox">>, <<"6E040AB3EF16FE699FFB513B0EF8E2E896DA7B18931A1EF817143037C454BCCE">>},
{<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}]},
{pkg_hash_ext,[
{<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>},
{<<"chatterbox">>, <<"B93D19104D86AF0B3F2566C4CBA2A57D2E06D103728246BA1AC6C3C0FF010AA7">>},
{<<"chatterbox">>, <<"4F75B91451338BC0DA5F52F3480FA6EF6E3A2AEECFC33686D6B3D0A0948F31AA">>},
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"grpcbox">>, <<"294DF743AE20A7E030889F00644001370A4F7CE0121F3BBDAF13CF3169C62913">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]}
{<<"grpcbox">>, <<"4A3B5D7111DAABC569DC9CBD9B202A3237D81C80BF97212FBC676832CB0CEB17">>},
{<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}]}
].
55 changes: 19 additions & 36 deletions src/greptimedb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,23 @@
-export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1,
async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]).

-export_type([metric/0, point/0, timeunit/0]).

-type table() :: atom() | binary() | list().
-type dbname() :: atom() | binary() | list().
-type timeunit() :: ns | us| ms | s | nanosecond | microsecond | millisecond | second.
-type metric() :: table()
| {dbname(), table()}
| #{dbname => dbname(), table := table(), timeunit => timeunit()}.
-type point() :: #{tags => map(),
fields => map(),
timestamp => integer()}.

-spec start_client(list()) ->
{ok, Client :: map()} |
{error, {already_started, Client :: map()}} |
{error, Reason :: term()}.

start_client(Options0) ->
Pool = proplists:get_value(pool, Options0),
Options = lists:keydelete(protocol, 1, lists:keydelete(pool, 1, Options0)),
Expand All @@ -41,30 +54,15 @@ start_client(Options0) ->
%% @doc Write points to the metric table, return the result.
-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
Metric :: metric(),
Points :: [point()].
write(Client, Metric, Points) ->
write_batch(Client, [{Metric, Points}]).

%% @doc Write a batch of data points to the database, return the result.
-spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
MetricAndPoints :: [{metric(), [point()]}].
write_batch(Client, MetricAndPoints) ->
try
Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints),
Expand All @@ -89,31 +87,16 @@ write_stream(Client) ->
%% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received.
-spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
Metric :: metric(),
Points :: [point()],
ResultCallback :: {function(), list()}.
async_write(Client, Metric, Points, ResultCallback) ->
async_write_batch(Client, [{Metric, Points}], ResultCallback).

%% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received.
-spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
MetricAndPoints :: [{metric(), [point()]}],
ResultCallback :: {function(), list()}.
async_write_batch(Client, MetricAndPoints, ResultCallback) ->
Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints),
Expand Down
32 changes: 23 additions & 9 deletions src/greptimedb_encoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,44 @@ insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) ->
#{dbname => DbName, authorization => #{auth_scheme => Scheme}}
end,
#{header => Header, request => {inserts, #{inserts => Inserts}}};
insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbName, Inserts) ->
{DbName, Insert} = insert_request(Options, Table, Points),
insert_requests(#{cli_opts := Options} = Client, [{Metric, Points} | T], PrevDbName, Inserts) ->
{DbName, Insert} = insert_request(Options, metric(Options, Metric), Points),
case PrevDbName of
unknown ->
insert_requests(Client, T, DbName, [Insert | Inserts]);
Name when Name == DbName ->
insert_requests(Client, T, Name, [Insert | Inserts])
end.

insert_request(Options, {DbName, Table}, Points) ->
Timeunit = proplists:get_value(timeunit, Options, ms),
insert_request(_Options, #{dbname := DbName, table := Table, timeunit := Timeunit}, Points) ->
RowCount = length(Points),
Columns =
lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Timeunit, Points)),
Columns = lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end,
collect_columns(Timeunit, Points)),
{DbName,
#{table_name => Table,
columns => Columns,
row_count => RowCount}};
insert_request(Options, Table, Points) ->
insert_request(Options, {?DEFAULT_DBNAME, Table}, Points).
row_count => RowCount}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

metric(Options, Metric) ->
metric_with_default(default_metric(Options), Metric).

default_metric(Options) ->
#{dbname => ?DEFAULT_DBNAME,
timeunit => proplists:get_value(timeunit, Options, ms)}.

%% table is required
metric_with_default(Default, #{table := _} = Metric) ->
maps:merge(Default, Metric);
%% backward compatibility
metric_with_default(Default, {DbName, Table}) ->
Default#{dbname => DbName, table => Table};
metric_with_default(Default, Table) when is_atom(Table); is_list(Table); is_binary(Table) ->
Default#{table => Table}.

collect_columns(Timeunit, Points) ->
collect_columns(Timeunit, Points, []).

Expand Down
21 changes: 21 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ all() ->
[t_write,
t_write_stream,
t_insert_requests,
t_insert_requests_with_timeunit,
t_write_failure,
t_write_batch,
t_bench_perf,
Expand Down Expand Up @@ -107,6 +108,26 @@ t_insert_requests(_) ->
end,
ok.

t_insert_requests_with_timeunit(_) ->
TsNano = 1705946037724448346,
Points = [#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
AuthInfo = {basic, #{username => "test", password => "test"}},
Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]},
Metric = #{table => "Test", timeunit => nanosecond},
Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]),
#{header := #{dbname := _DbName, authorization := _Auth},
request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request,
{value, TimestampColumn} =
lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns),
?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).

t_write_failure(_) ->
Metric = <<"temperatures">>,
Points =
Expand Down

0 comments on commit 519b7c2

Please sign in to comment.