Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: schedule_job and interval #231

Merged
merged 27 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7e9d8d8
test: for builtin functions
discord9 Sep 2, 2022
20913e9
test: expect fail for `datetime()`
discord9 Sep 2, 2022
89a1384
feat: add `interval()` fn(WIP)
discord9 Sep 2, 2022
8013b3e
feat: `interval()` fn in builtin(UNTEST)
discord9 Sep 2, 2022
0c78d29
refactor: move `py_vec_obj_to_array` to util.rs
discord9 Sep 2, 2022
48a2e75
style: fmt
discord9 Sep 2, 2022
d2b5c58
test: simple `interval()` cases
discord9 Sep 2, 2022
d5551b6
test: `interval()` with `last()`&`first()`
discord9 Sep 2, 2022
3b99aec
doc: `ts` param of `interval()`
discord9 Sep 2, 2022
2afc465
log: common_telemetry for logging in script crate
discord9 Sep 5, 2022
4345196
doc: corrsponding test fn for each .ron file
discord9 Sep 5, 2022
dd6adfa
feat: change to`mpsc` for schedule_job
discord9 Sep 5, 2022
c63a178
test: schedule_job
discord9 Sep 5, 2022
e4fe0a4
dep: rm rustpython dep in common-function
discord9 Sep 5, 2022
d914991
refactor: mv `schedule_job` into `Script` trait
discord9 Sep 6, 2022
b4dd96e
test: change to use `interval` to sample datapoint
discord9 Sep 6, 2022
25d87e1
feat: add gen_none_array for generate None Array
discord9 Sep 7, 2022
ca95d23
feat: impl Missing value for `prev`&`next`
discord9 Sep 7, 2022
584cfae
test: `sum(prev(values))`
discord9 Sep 7, 2022
1d98727
doc: add comment for why not support Float16 in `prev()`
discord9 Sep 7, 2022
32d81c2
feat: add `interval` in py side mock module
discord9 Sep 8, 2022
0941739
style: cargo fmt
discord9 Sep 8, 2022
0414f76
refactor: according to comments
discord9 Sep 13, 2022
ca7ee2f
refactor: extract `apply_interval_function`
discord9 Sep 13, 2022
d09025a
style: cargo fmt
discord9 Sep 13, 2022
0737eec
refactor: remove `schedule()`
discord9 Sep 13, 2022
9b2d17f
style: cargo fmt
discord9 Sep 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 11 additions & 9 deletions component/script/python/example/calc_rv.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ def as_table(kline: list):
"rv_60d",
"rv_90d",
"rv_180d"
],
sql="select open_time, close from k_line")
])
def calc_rvs(open_time, close):
from greptime import vector, log, prev, sqrt, datetime, pow, sum
from greptime import vector, log, prev, sqrt, datetime, pow, sum, last
import greptime as g
def calc_rv(close, open_time, time, interval):
mask = (open_time < time) & (open_time > time - interval)
close = close[mask]
open_time = open_time[mask]
close = g.interval(open_time, close, datetime("10m"), lambda x:last(x))

avg_time_interval = (open_time[-1] - open_time[0])/(len(open_time)-1)
ref = log(close/prev(close))
Expand All @@ -60,10 +62,10 @@ def calc_rv(close, open_time, time, interval):
# how to get env var,
# maybe through accessing scope and serde then send to remote?
timepoint = open_time[-1]
rv_7d = calc_rv(close, open_time, timepoint, datetime("7d"))
rv_15d = calc_rv(close, open_time, timepoint, datetime("15d"))
rv_30d = calc_rv(close, open_time, timepoint, datetime("30d"))
rv_60d = calc_rv(close, open_time, timepoint, datetime("60d"))
rv_90d = calc_rv(close, open_time, timepoint, datetime("90d"))
rv_180d = calc_rv(close, open_time, timepoint, datetime("180d"))
rv_7d = vector([calc_rv(close, open_time, timepoint, datetime("7d"))])
rv_15d = vector([calc_rv(close, open_time, timepoint, datetime("15d"))])
rv_30d = vector([calc_rv(close, open_time, timepoint, datetime("30d"))])
rv_60d = vector([calc_rv(close, open_time, timepoint, datetime("60d"))])
rv_90d = vector([calc_rv(close, open_time, timepoint, datetime("90d"))])
rv_180d = vector([calc_rv(close, open_time, timepoint, datetime("180d"))])
return rv_7d, rv_15d, rv_30d, rv_60d, rv_90d, rv_180d
22 changes: 11 additions & 11 deletions component/script/python/example/kline.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231300,
"open_time": 300,
"open": "10107",
"high": "10109.34",
"low": "10106.71",
Expand All @@ -16,7 +16,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231360,
"open_time": 900,
"open": "10106.79",
"high": "10109.27",
"low": "10105.92",
Expand All @@ -25,7 +25,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231420,
"open_time": 1200,
"open": "10106.09",
"high": "10108.75",
"low": "10104.66",
Expand All @@ -34,7 +34,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231480,
"open_time": 1800,
"open": "10108.73",
"high": "10109.52",
"low": "10106.07",
Expand All @@ -43,7 +43,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231540,
"open_time": 2400,
"open": "10106.38",
"high": "10109.48",
"low": "10104.81",
Expand All @@ -52,7 +52,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231600,
"open_time": 3000,
"open": "10106.95",
"high": "10109.48",
"low": "10106.6",
Expand All @@ -61,7 +61,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231660,
"open_time": 3600,
"open": "10107.55",
"high": "10109.28",
"low": "10104.68",
Expand All @@ -70,7 +70,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231720,
"open_time": 4200,
"open": "10104.68",
"high": "10109.18",
"low": "10104.14",
Expand All @@ -79,7 +79,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231780,
"open_time": 4800,
"open": "10108.8",
"high": "10117.36",
"low": "10108.8",
Expand All @@ -88,7 +88,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231840,
"open_time": 5400,
"open": "10115.96",
"high": "10119.19",
"low": "10115.96",
Expand All @@ -97,7 +97,7 @@
{
"symbol": "BTCUSD",
"period": "1",
"open_time": 1581231900,
"open_time": 6000,
"open": "10117.08",
"high": "10120.73",
"low": "10116.96",
Expand Down
2 changes: 1 addition & 1 deletion component/script/python/greptime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .greptime import coprocessor, copr
from .greptime import vector, log, prev, sqrt, pow, datetime, sum
from .greptime import vector, log, prev, next, first, last, sqrt, pow, datetime, sum, interval
from .mock import mock_tester
from .cfg import set_conn_addr, get_conn_addr
40 changes: 16 additions & 24 deletions component/script/python/greptime/greptime.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,42 +89,34 @@ def datatype(self):
def filter(self, lst_bool):
return self[lst_bool]

def last(lst):
return lst[-1]

def first(lst):
return lst[0]

def prev(lst):
ret = np.zeros(len(lst))
ret[1:] = lst[0:-1]
ret[0] = nan
return ret

def next(lst):
ret = np.zeros(len(lst))
ret[:-1] = lst[1:]
ret[-1] = nan
return ret

def query(sql: str):
pass


def interval(arr: list, duration: int, fill, step: None | int = None, explicitOffset=False):
def interval(ts: vector, arr: vector, duration: int, func):
"""
Note that this is a mock function with same functionailty to the actual Python Coprocessor
`arr` is a vector of integral or temporal type.

`duration` is the length of sliding window

`step` being the length when sliding window take a step

`fill` indicate how to fill missing value:
- "prev": use previous
- "post": next
- "linear": linear interpolation, if not possible to interpolate certain types, fallback to prev
- "null": use null
- "none": do not interpolate
"""
if step is None:
step = duration

tot_len = int(np.ceil(len(arr) // step))
slices = np.zeros((tot_len, int(duration)))
for idx, start in enumerate(range(0, len(arr), step)):
slices[idx] = arr[start:(start + duration)]
return slices
start = np.min(ts)
end = np.max(ts)
masks = [(ts >= i) & (ts <= (i+duration)) for i in range(start, end, duration)]
lst_res = [func(arr[mask]) for mask in masks]
return lst_res


def factor(unit: str) -> int:
Expand Down
2 changes: 1 addition & 1 deletion component/script/python/greptime/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from typing import Any
import numpy as np
from .greptime import i32,i64,f32,f64, vector, interval, query, prev, datetime, log, sum, sqrt, pow, nan, copr, coprocessor
from .greptime import i32,i64,f32,f64, vector, interval, prev, datetime, log, sum, sqrt, pow, nan, copr, coprocessor

import inspect
import functools
Expand Down
15 changes: 10 additions & 5 deletions component/script/python/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def get_db(req:str):
return requests.get("http://{}{}".format(get_conn_addr(), req))

if __name__ == "__main__":
with open("component/script/python/example/kline.json", "r") as kline_file:
kline = json.load(kline_file)
table = as_table(kline["result"])
close = table["close"]
open_time = table["open_time"]
env = {"close":close, "open_time": open_time}

res = mock_tester(calc_rvs, env=env)
print("Mock result:", [i[0] for i in res])
exit()
if len(sys.argv)!=2:
raise Exception("Expect only one address as cmd's args")
set_conn_addr(sys.argv[1])
Expand All @@ -42,11 +52,6 @@ def get_db(req:str):
open_time = table["open_time"]
init_table(close, open_time)

# print(repr(close), repr(open_time))
# print("calc_rv:", calc_rv(close, open_time, open_time[-1]+datetime("10m"), datetime("7d")))
env = {"close":close, "open_time": open_time}
# print("env:", env)
print("Mock result:", mock_tester(calc_rvs, env=env))
real = calc_rvs()
print(real)
try:
Expand Down
6 changes: 0 additions & 6 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ num = "0.4"
num-traits = "0.2"
once_cell = "1.10"
paste = "1.0"
rustpython-ast = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-bytecode = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-compiler = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-compiler-core = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
snafu = { version = "0.7", features = ["backtraces"] }
statrs = "0.15"

Expand Down
5 changes: 5 additions & 0 deletions src/script/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ python = [
"dep:rustpython-compiler-core",
"dep:rustpython-bytecode",
"dep:rustpython-ast",
"dep:paste"
]

[dependencies]
Expand All @@ -23,6 +24,7 @@ common-error = {path = "../common/error"}
common-function = { path = "../common/function" }
common-query = {path = "../common/query"}
common-recordbatch = {path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
console = "0.15"
datafusion = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", optional = true}
datafusion-common = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2"}
Expand All @@ -38,8 +40,11 @@ rustpython-compiler = {git = "https://github.com/RustPython/RustPython", optiona
rustpython-compiler-core = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
paste = { version = "1.0", optional = true}
snafu = {version = "0.7", features = ["backtraces"]}
sql = { path = "../sql" }
tokio = { version = "1.0", features = ["full"] }


[dev-dependencies]
catalog = { path = "../catalog" }
Expand Down
1 change: 1 addition & 0 deletions src/script/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(iterator_try_reduce)]
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
pub mod engine;
#[cfg(feature = "python")]
pub mod python;
Loading