Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: BigqueryIO is very slow if using storage api and dynamic destination to write data to over thousand different tables with high data skew #32508

Open
2 of 17 tasks
ns-shua opened this issue Sep 19, 2024 · 9 comments

Comments

@ns-shua
Copy link

ns-shua commented Sep 19, 2024

What happened?

I'm trying to use BigqueryIO and use the Storage API as suggested in at least once mode(both pipeline and IO) My requirement is to write data to over thousand table in different projects. And the data is highly skews the top 10 tables could take 80% of the traffic. I observe the pipeline becomes super slow and CPU utilization is almost always below 30%. I think it is the data skew problem. But our data is logically partitioned in that way that I have no control of it. I tried to write same volume to data to single table(all the tables are in same schema). It perform very well even with 1/4 of the machines. The document claims DynamicDestination should perform as good as single destination. Is there any performance issue or is there any suggestions?

Here is the code I use to write to different table

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            new SerializableFunction<..>() {...} // Here I tried both SerializableFunction and DynamicDestination class
         );

This code perform much much worse than

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            "project_all.example_dataset.alldata"
         );

with same amount of data

Writing to different tables the CPU usage is constantly below 30% while writing to single table CPU usage is constantly near 100%

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

Have you tried to profile the pipeline to figure out some potential issues?
cc @ahmedabu98

@ns-shua
Copy link
Author

ns-shua commented Sep 20, 2024

@liferoad There are some upstream transform I could improve but it has nothing to do with the bigquery write. The only difference in code is writing to one table or writing to many tables

@liferoad
Copy link
Collaborator

liferoad commented Sep 20, 2024

Added the dev list thread here: https://lists.apache.org/thread/gz5zhnworvcjog0o4g96lsqbw5tz6y03
@ns-shua -shua Have you opened a customer support ticket for Dataflow? It will be helpful to check your Dataflow jobs.

@ahmedabu98
Copy link
Contributor

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

@ahmedabu98
I believe if i don't use connection pool, writing to one table won't work. So yes I've set it to true
@liferoad
I asked in mailing channel also I created support ticket but so far I got 0 useful help or tip. They mentioned they found a hotkey? I'm not sure. Can you explain if the data volume is high skewed among all the tables what would auto sharding behave, does it create more workers for hot tables?

@liferoad
Copy link
Collaborator

What is your support ticket number? Is this streaming or batch?

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

@liferoad Case 53209037
I'm confused by the memory dump, I do see a lot of StorageApiWriteUnshardedRecords but I have withAutosharding() in my code

@ns-shua
Copy link
Author

ns-shua commented Sep 21, 2024

It is streaming at least once mode

@liferoad
Copy link
Collaborator

Can you share the latest entire code if possible? From the ticket, it seems the job with withAutosharding does not scale down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants