Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create materialized view on external table #1407

Closed
Tracked by #3
dai-chen opened this issue Mar 7, 2023 · 3 comments
Closed
Tracked by #3

Create materialized view on external table #1407

dai-chen opened this issue Mar 7, 2023 · 3 comments

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Mar 7, 2023

Is your feature request related to a problem?

In #1379, we've verified query acceleration capability and maintainance of secondary index. This issue will focus on materialized view implementation.

What solution would you like?

Will work on the following task for prototype, demo and development afterwards:

  1. Extend Spark grammar with CREATE MATERIALIZED VIEW
  2. Refresh MV data by Spark streaming job
  3. Add tumbling window function

What alternatives have you considered?

Alternative solution and reference for this work:

  1. Apache Carbondata: https://carbondata.apache.org/index.html

Do you have any additional context?

Please find more details in:

  1. [RFC] Automatic Workload-Driven Query Acceleration by OpenSearch opensearch-spark#128
  2. [RFC] OpenSearch and Apache Spark Integration opensearch-spark#4
@acarbonetto
Copy link
Collaborator

Do we have a standard for creating "internal" or "plugin" indices?

Store materialized view data in OpenSearch index

@dai-chen
Copy link
Collaborator Author

Do we have a standard for creating "internal" or "plugin" indices?

Store materialized view data in OpenSearch index

For internal indices, there is hidden or system index in OpenSearch. In our case, MV data maybe regular index.

@dai-chen
Copy link
Collaborator Author

dai-chen commented Mar 13, 2023

Proof of Concepts

Setup

Please refer to #1379 (comment)

Cleanup

#Drop tables
DROP TABLE maximus_alb_logs;
DROP TABLE alb_logs_metrics;

#Clean MV and index previous created
rm -rf spark-warehouse/alb_logs_metrics
rm -rf spark-warehouse/indexes/*

#Remove _delta folder on S3

#This checkpoint folder location is hardcoding in code for now
rm -rf /tmp/delta/_checkpoints/*

Test with ALB Logs

General Case: MV with Aggregation for Log-to-Metric Transformation

#Create temp table for simulating ingestion
CREATE TABLE IF NOT EXISTS alb_logs_temp
(
  type string,
  time timestamp,
  elb string,
  client_ip string,
  client_port int,
  target_ip string,
  target_port int,
  request_processing_time double,
  target_processing_time double,
  response_processing_time double,
  elb_status_code int,
  target_status_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  request_url string,
  request_proto string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string,
  target_group_arn string,
  trace_id string,
  domain_name string,
  chosen_cert_arn string,
  matched_rule_priority string,
  request_creation_time string,
  actions_executed string,
  redirect_url string,
  lambda_error_reason string,
  target_port_list string,
  target_status_code_list string,
  classification string,
  classification_reason string
)
USING PARQUET
LOCATION "s3a://maximus-alb-logs/";

#Create Maimxus table with auto refresh enabled
CREATE EXTERNAL TABLE IF NOT EXISTS maximus_alb_logs
(
  type string,
  time timestamp,
  elb string,
  client_ip string,
  client_port int,
  target_ip string,
  target_port int,
  request_processing_time double,
  target_processing_time double,
  response_processing_time double,
  elb_status_code int,
  target_status_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  request_url string,
  request_proto string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string,
  target_group_arn string,
  trace_id string,
  domain_name string,
  chosen_cert_arn string,
  matched_rule_priority string,
  request_creation_time string,
  actions_executed string,
  redirect_url string,
  lambda_error_reason string,
  target_port_list string,
  target_status_code_list string,
  classification string,
  classification_reason string
)
USING DELTA
LOCATION "s3a://maximus-alb-logs/"
TBLPROPERTIES ('auto_refresh'='true');

#Create MV for total count with 1 minute window
CREATE MATERIALIZED VIEW alb_logs_metrics
AS
SELECT
  window.start AS startTime,
  window.end AS endTime,
  COUNT(*) AS totalCount
FROM maximus_alb_logs
GROUP BY TUMBLE(time, '1 Minutes');

#Add 2 same records at 16:30:00 by running twice
INSERT INTO alb_logs_temp
VALUES
(
  "https", --type
  CAST("2023-03-13 16:30:00.000000" AS TIMESTAMP), --time
  "app/elb1",      --elb
  "10.212.10.100", --client_ip
  41950,           --client_port
  "10.212.20.1",   --target_ip
  443,   --target_port
  0.002, --request_processing_time
  0.046, --target_processing_time
  0.0,   --response_processing_time
  403,   --elb_status_code
  "403", --target_status_code
  211,   --received_bytes
  364,   --sent_bytes
  "GET", --request_verb
  "https://192.168.1.100:443/solr/", --request_url
  NULL,  --request_proto
  NULL,  --user_agent
  NULL,  --ssl_cipher
  NULL,  --ssl_protocol
  NULL,  --target_group_arn
  NULL,  --trace_id
  NULL,  --domain_name
  NULL,  --chosen_cert_arn
  NULL,  --matched_rule_priority
  NULL,  --request_creation_time
  NULL,  --actions_executed
  NULL,  --redirect_url
  NULL,  --lambda_error_reason
  NULL,  --target_port_list
  NULL,  --target_status_code_list
  NULL,  --classification
  NULL   --classification_reason
);

#No records in MV due to records not under watermark
SELECT * FROM alb_logs_metrics;
(None)

#Add a record at 16:32:00 which trigger the first 2 records fired
INSERT INTO alb_logs_temp
VALUES
(
  "https", --type
  CAST("2023-03-13 16:32:00.000000" AS TIMESTAMP), --time
  "app/elb1",      --elb
  "10.212.10.100", --client_ip
  41950,           --client_port
  "10.212.20.1",   --target_ip
  443,   --target_port
  0.002, --request_processing_time
  0.046, --target_processing_time
  0.0,   --response_processing_time
  403,   --elb_status_code
  "403", --target_status_code
  211,   --received_bytes
  364,   --sent_bytes
  "GET", --request_verb
  "https://192.168.1.100:443/solr/", --request_url
  NULL,  --request_proto
  NULL,  --user_agent
  NULL,  --ssl_cipher
  NULL,  --ssl_protocol
  NULL,  --target_group_arn
  NULL,  --trace_id
  NULL,  --domain_name
  NULL,  --chosen_cert_arn
  NULL,  --matched_rule_priority
  NULL,  --request_creation_time
  NULL,  --actions_executed
  NULL,  --redirect_url
  NULL,  --lambda_error_reason
  NULL,  --target_port_list
  NULL,  --target_status_code_list
  NULL,  --classification
  NULL   --classification_reason
);

SELECT * FROM alb_logs_metrics;
2023-03-13 16:30:00    2023-03-13 16:31:00    2

#Add another one at 16:35:00
INSERT INTO alb_logs_temp
VALUES
(
  "https", --type
  CAST("2023-03-13 16:35:00.000000" AS TIMESTAMP), --time
  "app/elb1",      --elb
  "10.212.10.100", --client_ip
  41950,           --client_port
  "10.212.20.1",   --target_ip
  443,   --target_port
  0.002, --request_processing_time
  0.046, --target_processing_time
  0.0,   --response_processing_time
  403,   --elb_status_code
  "403", --target_status_code
  211,   --received_bytes
  364,   --sent_bytes
  "GET", --request_verb
  "https://192.168.1.100:443/solr/", --request_url
  NULL,  --request_proto
  NULL,  --user_agent
  NULL,  --ssl_cipher
  NULL,  --ssl_protocol
  NULL,  --target_group_arn
  NULL,  --trace_id
  NULL,  --domain_name
  NULL,  --chosen_cert_arn
  NULL,  --matched_rule_priority
  NULL,  --request_creation_time
  NULL,  --actions_executed
  NULL,  --redirect_url
  NULL,  --lambda_error_reason
  NULL,  --target_port_list
  NULL,  --target_status_code_list
  NULL,  --classification
  NULL   --classification_reason
);

SELECT * FROM alb_logs_metrics;
2023-03-13 16:30:00    2023-03-13 16:31:00    2
2023-03-13 16:32:00    2023-03-13 16:33:00    1

Special Case: MV as Covering Index (Streaming Raw Data into OpenSearch Index)

#Create MV to simply load all data
CREATE MATERIALIZED VIEW alb_logs_raw
AS
SELECT * FROM maximus_alb_logs;

#Add records
INSERT INTO alb_logs_temp
VALUES
(
  "https", --type
  CAST("2023-03-13 16:30:00.000000" AS TIMESTAMP), --time
  "app/elb1",      --elb
  "10.212.10.100", --client_ip
  41950,           --client_port
  "10.212.20.1",   --target_ip
  443,   --target_port
  0.002, --request_processing_time
  0.046, --target_processing_time
  0.0,   --response_processing_time
  403,   --elb_status_code
  "403", --target_status_code
  211,   --received_bytes
  364,   --sent_bytes
  "GET", --request_verb
  "https://192.168.1.100:443/solr/", --request_url
  NULL,  --request_proto
  NULL,  --user_agent
  NULL,  --ssl_cipher
  NULL,  --ssl_protocol
  NULL,  --target_group_arn
  NULL,  --trace_id
  NULL,  --domain_name
  NULL,  --chosen_cert_arn
  NULL,  --matched_rule_priority
  NULL,  --request_creation_time
  NULL,  --actions_executed
  NULL,  --redirect_url
  NULL,  --lambda_error_reason
  NULL,  --target_port_list
  NULL,  --target_status_code_list
  NULL,  --classification
  NULL   --classification_reason
);

#Because this is not streaming aggregation, data is output to MV immediately
SELECT * FROM alb_logs_raw;
https    2023-03-13 16:30:00    app/elb1    10.212.10.100    41950    10.212.20.1    443    0.002    0.046    0.0    403    403    211    364    GET    https://192.168.1.100:443/solr/    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
Time taken: 0.289 seconds, Fetched 1 row(s)

#Add another record
INSERT INTO alb_logs_temp
VALUES
(
  "https", --type
  CAST("2023-03-13 16:32:00.000000" AS TIMESTAMP), --time
  "app/elb1",      --elb
  "10.212.10.100", --client_ip
  41950,           --client_port
  "10.212.20.1",   --target_ip
  443,   --target_port
  0.002, --request_processing_time
  0.046, --target_processing_time
  0.0,   --response_processing_time
  403,   --elb_status_code
  "403", --target_status_code
  211,   --received_bytes
  364,   --sent_bytes
  "GET", --request_verb
  "https://192.168.1.100:443/solr/", --request_url
  NULL,  --request_proto
  NULL,  --user_agent
  NULL,  --ssl_cipher
  NULL,  --ssl_protocol
  NULL,  --target_group_arn
  NULL,  --trace_id
  NULL,  --domain_name
  NULL,  --chosen_cert_arn
  NULL,  --matched_rule_priority
  NULL,  --request_creation_time
  NULL,  --actions_executed
  NULL,  --redirect_url
  NULL,  --lambda_error_reason
  NULL,  --target_port_list
  NULL,  --target_status_code_list
  NULL,  --classification
  NULL   --classification_reason
);

SELECT * FROM alb_logs_raw;
https    2023-03-13 16:30:00    app/elb1    10.212.10.100    41950    10.212.20.1    443    0.002    0.046    0.0    403    403    211    364    GET    https://192.168.1.100:443/solr/    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
https    2023-03-13 16:32:00    app/elb1    10.212.10.100    41950    10.212.20.1    443    0.002    0.046    0.0    403    403    211    364    GET    https://192.168.1.100:443/solr/    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
Time taken: 0.216 seconds, Fetched 2 row(s)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants