This is a streaming data processing pipeline written in Apache Beam for BC which could potentially drive Google Dataflow, Flink, Spark Streaming, Storm and various other distributed computing platform without maintaining many code branches.
This pipeline currently supports
- Polkadot node logs over GKE
- Polkadot node chain data (Blocks & Transactions)
./run df
will do this in one shot - Ethereum node chain data (Blocks & Transcations)
k8s/GKE system logs and metrics in Elasticsearch please refer to Cloudy Moma
You can use this master branch as a skeleton java project
master分支可以用来当作一个骨架项目
beam代码理论上可以驱动spark,flink等等流式框架,详情参考这里
Stackdriver logging -> Pubsub -> Dataflow -> Elasticsearch
Java dev environment
- JDK8+
- Maven
Elasticsearch
- Option 1: Run your own
- Option 2: Run on k8s / GKE, recommended :)
- Option 3: Run on Elastic Cloud
- Setup GCP
You could simply run cd scripts && ./gcp_setup.sh; cd -
, but before that, make sure the parameters on the top have been updated according to your environment, especially the project
variable, others are really optional.
So this script will
- Create a Pubsub topic and a subscription, this subscription should be configured later for Dataflow job
- Setup a Stackdriver sink (Pubsub) for HTTP load balancers
- Grant permissions to the Service Account that been used by the sink, who will publish logs to Pubsub topic
Alternatively, you could do all the above in GCP console if you are familiar with the WebUI.
- Sethup Elasticsearch & Kibana
Same as GCP, there is a script can get the job done. Simply run cd scripts/elastic && ./init.sh; cd -
then you done. Also, make sure you have updated the parameters on the top of the init.sh
script according to your Elasticsearch setup.
This script will
- Create an index pipeline for GCLB logs, mainly for adding Geo information and parsing User Agent field
- Create an index template in Elasticsearch, so if the index name starts with
gclb*
it will use the schema & settings defined here - Create an index called
gclb-000001
and a writing alias associate with it namedgclb-ingest
- Create an index rolling policy for the created alias, hence the dataflow only write to the fixed index name with more indices been created
gclb-000002
,gclb-000003
... etc. etc. underneath. The policy has been defined here, you could update that according to your scenario. The default rolling policy is either the index is 30-day old or hit 1 million docs or 5GB in size will create a new one.
More information about index management - highly recommended for logging senarios
Now you good to go.
Double check the paramters passed to the job trigger in makefile
, then,
make df
First of all, we could do it when create a the sink. Or in the Elasticsearch pipeline. It's highly recommended to do it at the sink. That would be more efficient. We only demonstrate how to use that drop processor here in the code in case you may need for other purposes.
The reason we drop that is to prevent a dead loop. We have configured our Elastic Stack behind the Google Cloud Load Balancer which all have the keyword ingest for Elasticsearch ingest nodes. So the accessing logs will be processed by the logging pipeline as an infinite loop. Imagine: POST data to ingest nodes -> GCLB produce logs -> ingest logs over and over again.
So you may or may not need this, please adjust accordingly to your environment.
First thing first, all latency values, such as httpRequest.backendLatency
and httpRequest.frontendSrtt
etc. are all presented in seconds
. We have turned into ms
in Kibana dashboard by using TSVB
widgets :)
Import from this example