From e581c100b78cefb13739182d7a7062408eeb1236 Mon Sep 17 00:00:00 2001 From: parmesant Date: Mon, 5 Aug 2024 14:06:46 +0530 Subject: [PATCH] fix: update object store crate to 0.10.2 (#870) Co-authored-by: parmesant --- Cargo.lock | 353 +++++++++++++-------- server/Cargo.toml | 20 +- server/src/handlers/http/llm.rs | 2 +- server/src/hottier.rs | 1 + server/src/query.rs | 10 +- server/src/query/filter_optimizer.rs | 307 +++++++++--------- server/src/query/stream_schema_provider.rs | 41 ++- server/src/storage/metrics_layer.rs | 48 ++- server/src/storage/s3.rs | 99 +++--- 9 files changed, 497 insertions(+), 384 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3db914e6..0b5e8b138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,14 +48,14 @@ dependencies = [ "ahash", "base64 0.21.7", "bitflags 2.5.0", - "brotli", + "brotli 3.5.0", "bytes", "bytestring", "derive_more", "encoding_rs", "flate2", "futures-core", - "h2", + "h2 0.3.26", "http 0.2.12", "httparse", "httpdate", @@ -425,9 +425,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" +checksum = "05048a8932648b63f21c37d88b552ccc8a65afb6dfe9fc9f30ce79174c2e7a85" dependencies = [ "arrow-arith", "arrow-array", @@ -446,9 +446,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" +checksum = "1d8a57966e43bfe9a3277984a14c24ec617ad874e4c0e1d2a1b083a39cfbf22c" dependencies = [ "arrow-array", "arrow-buffer", @@ -461,9 +461,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" +checksum = "16f4a9468c882dc66862cef4e1fd8423d47e67972377d85d80e022786427768c" dependencies = [ "ahash", "arrow-buffer", @@ -472,15 +472,15 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "num", ] [[package]] name = "arrow-buffer" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" +checksum = "c975484888fc95ec4a632cdc98be39c085b1bb518531b0c80c5d462063e5daa1" dependencies = [ "bytes", "half", @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" +checksum = "da26719e76b81d8bc3faad1d4dbdc1bcc10d14704e63dc17fc9f3e7e1e567c8e" dependencies = [ "arrow-array", "arrow-buffer", @@ -510,9 +510,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" +checksum = "c13c36dc5ddf8c128df19bab27898eea64bf9da2b555ec1cd17a8ff57fba9ec2" dependencies = [ "arrow-array", "arrow-buffer", @@ -529,9 +529,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" +checksum = "dd9d6f18c65ef7a2573ab498c374d8ae364b4a4edf67105357491c031f716ca5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -541,9 +541,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3241ce691192d789b7b94f56a10e166ee608bdc3932c759eb0b85f09235352bb" +checksum = "8e7ffbc96072e466ae5188974725bb46757587eafe427f77a25b828c375ae882" dependencies = [ "arrow-array", "arrow-buffer", @@ -562,9 +562,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" +checksum = "e786e1cdd952205d9a8afc69397b317cfbb6e0095e445c69cda7e8da5c1eeb0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -578,9 +578,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" +checksum = "fb22284c5a2a01d73cebfd88a33511a3234ab45d66086b2ca2d1228c3498e445" dependencies = [ "arrow-array", "arrow-buffer", @@ -598,9 +598,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" +checksum = "42745f86b1ab99ef96d1c0bcf49180848a64fe2c7a7a0d945bc64fa2b21ba9bc" dependencies = [ "arrow-array", "arrow-buffer", @@ -613,9 +613,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" +checksum = "4cd09a518c602a55bd406bcc291a967b284cfa7a63edfbf8b897ea4748aad23c" dependencies = [ "ahash", "arrow-array", @@ -623,23 +623,22 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown 0.14.3", ] [[package]] name = "arrow-schema" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" +checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" +checksum = "600bae05d43483d216fb3494f8c32fdbefd8aa4e1de237e790dbb3d9f44690a3" dependencies = [ "ahash", "arrow-array", @@ -651,9 +650,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" +checksum = "f0dc1985b67cb45f6606a248ac2b4a288849f196bab8c657ea5589f47cdd55e6" dependencies = [ "arrow-array", "arrow-buffer", @@ -726,6 +725,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.2.0" @@ -883,7 +888,18 @@ checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 2.5.1", +] + +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 4.0.1", ] [[package]] @@ -896,6 +912,16 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1039,9 +1065,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.6" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" dependencies = [ "chrono", "chrono-tz-build", @@ -1050,9 +1076,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" dependencies = [ "parse-zoneinfo", "phf", @@ -1340,7 +1366,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -1354,9 +1380,8 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "datafusion" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85069782056753459dc47e386219aa1fdac5b731f26c28abb8c0ffd4b7c5ab11" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "ahash", "arrow", @@ -1374,23 +1399,26 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions", + "datafusion-functions-aggregate", "datafusion-functions-array", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-sql", "flate2", "futures", "glob", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", - "itertools", + "itertools 0.12.1", "log", "num_cpus", "object_store", "parking_lot", "parquet", + "paste", "pin-project-lite", "rand", "sqlparser", @@ -1405,9 +1433,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "309d9040751f6dc9e33c85dce6abb55a46ef7ea3644577dd014611c379447ef3" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "ahash", "arrow", @@ -1416,6 +1443,7 @@ dependencies = [ "arrow-schema", "chrono", "half", + "hashbrown 0.14.5", "instant", "libc", "num_cpus", @@ -1426,18 +1454,16 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e4a44d8ef1b1e85d32234e6012364c411c3787859bb3bba893b0332cb03dfd" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "tokio", ] [[package]] name = "datafusion-execution" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06a3a29ae36bcde07d179cc33b45656a8e7e4d023623e320e48dcf1200eeee95" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "arrow", "chrono", @@ -1445,7 +1471,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1456,16 +1482,17 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a3542aa322029c2121a671ce08000d4b274171070df13f697b14169ccf4f628" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", "chrono", "datafusion-common", "paste", + "serde_json", "sqlparser", "strum", "strum_macros", @@ -1473,9 +1500,8 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd221792c666eac174ecc09e606312844772acc12cbec61a420c2fca1ee70959" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "arrow", "base64 0.22.0", @@ -1485,22 +1511,39 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "datafusion-physical-expr", + "hashbrown 0.14.5", "hex", - "itertools", + "itertools 0.12.1", "log", "md-5", + "rand", "regex", "sha2", "unicode-segmentation", "uuid", ] +[[package]] +name = "datafusion-functions-aggregate" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", + "paste", + "sqlparser", +] + [[package]] name = "datafusion-functions-array" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e501801e84d9c6ef54caaebcda1b18a6196a24176c12fb70e969bc0572e03c55" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "arrow", "arrow-array", @@ -1511,16 +1554,15 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions", - "itertools", + "itertools 0.12.1", "log", "paste", ] [[package]] name = "datafusion-optimizer" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bd7f5087817deb961764e8c973d243b54f8572db414a8f0a8f33a48f991e0a" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "arrow", "async-trait", @@ -1528,17 +1570,18 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.3", - "itertools", + "hashbrown 0.14.5", + "indexmap 2.2.6", + "itertools 0.12.1", "log", + "paste", "regex-syntax", ] [[package]] name = "datafusion-physical-expr" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cabc0d9aaa0f5eb1b472112f16223c9ffd2fb04e58cbf65c0a331ee6e993f96" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "ahash", "arrow", @@ -1548,37 +1591,45 @@ dependencies = [ "arrow-schema", "arrow-string", "base64 0.22.0", - "blake2", - "blake3", "chrono", "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-physical-expr-common", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "hex", "indexmap 2.2.6", - "itertools", + "itertools 0.12.1", "log", - "md-5", "paste", "petgraph", - "rand", "regex", - "sha2", - "unicode-segmentation", +] + +[[package]] +name = "datafusion-physical-expr-common" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.14.5", + "rand", ] [[package]] name = "datafusion-physical-plan" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17c0523e9c8880f2492a88bbd857dde02bed1ed23f3e9211a89d3d7ec3b44af9" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", + "arrow-ord", "arrow-schema", "async-trait", "chrono", @@ -1586,12 +1637,14 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", - "itertools", + "itertools 0.12.1", "log", "once_cell", "parking_lot", @@ -1602,9 +1655,8 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "37.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49eb54b42227136f6287573f2434b1de249fe1b8e6cd6cc73a634e4a3ec29356" +version = "39.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=a64df83502821f18067fb4ff65dd217815b305c9#a64df83502821f18067fb4ff65dd217815b305c9" dependencies = [ "arrow", "arrow-array", @@ -1612,6 +1664,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "log", + "regex", "sqlparser", "strum", ] @@ -1743,9 +1796,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "23.5.26" +version = "24.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1946,6 +1999,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -1974,9 +2046,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", @@ -2142,7 +2214,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -2158,13 +2230,14 @@ dependencies = [ [[package]] name = "hyper" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -2197,7 +2270,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-util", "rustls 0.22.4", "rustls-pki-types", @@ -2220,16 +2293,16 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.3" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.4.1", "pin-project-lite", "socket2", "tokio", @@ -2300,7 +2373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -2347,6 +2420,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -2741,24 +2823,24 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3" dependencies = [ "async-trait", - "base64 0.21.7", + "base64 0.22.0", "bytes", "chrono", "futures", "humantime", - "hyper 0.14.28", - "itertools", + "hyper 1.4.1", + "itertools 0.13.0", "md-5", "parking_lot", "percent-encoding", "quick-xml", "rand", - "reqwest 0.11.27", + "reqwest 0.12.4", "ring", "serde", "serde_json", @@ -2840,9 +2922,9 @@ dependencies = [ [[package]] name = "parquet" -version = "51.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "096795d4f47f65fd3ee1ec5a98b77ab26d602f2cc785b0e4be5443add17ecc32" +checksum = "e977b9066b4d3b03555c22bdc442f3fadebd96a39111249113087d0edb2691cd" dependencies = [ "ahash", "arrow-array", @@ -2853,13 +2935,13 @@ dependencies = [ "arrow-schema", "arrow-select", "base64 0.22.0", - "brotli", + "brotli 6.0.0", "bytes", "chrono", "flate2", "futures", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -2871,6 +2953,7 @@ dependencies = [ "tokio", "twox-hash", "zstd 0.13.1", + "zstd-sys", ] [[package]] @@ -2925,7 +3008,7 @@ dependencies = [ "human-size", "humantime", "humantime-serde", - "itertools", + "itertools 0.13.0", "log", "maplit", "mime", @@ -2985,9 +3068,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "path-clean" @@ -3194,7 +3277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "811031bea65e5a401fb2e1f37d802cca6601e204ac463809a3189352d13b78a5" dependencies = [ "chrono", - "itertools", + "itertools 0.12.1", "once_cell", "regex", ] @@ -3217,7 +3300,7 @@ checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", "heck 0.5.0", - "itertools", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -3237,7 +3320,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.60", @@ -3276,9 +3359,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" dependencies = [ "memchr", "serde", @@ -3410,7 +3493,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -3423,7 +3506,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.11", - "rustls-native-certs", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -3432,12 +3514,10 @@ dependencies = [ "system-configuration", "tokio", "tokio-rustls 0.24.1", - "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", "web-sys", "webpki-roots 0.25.4", "winreg 0.50.0", @@ -3453,10 +3533,11 @@ dependencies = [ "bytes", "futures-core", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-rustls 0.26.0", "hyper-util", "ipnet", @@ -3467,6 +3548,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.22.4", + "rustls-native-certs", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", @@ -3475,10 +3557,12 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls 0.25.0", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.26.1", "winreg 0.52.0", @@ -3598,12 +3682,13 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -3927,9 +4012,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "sqlparser" -version = "0.44.0" +version = "0.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf9c7ff146298ffda83a200f8d5084f08dcee1edfc135fcc1d646a45d50ffd6" +checksum = "295e9930cd7a97e58ca2a070541a3ca502b17f5d1fa7157376d0fabd85324f25" dependencies = [ "log", "sqlparser_derive", @@ -4027,9 +4112,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "sysinfo" -version = "0.30.11" +version = "0.30.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87341a165d73787554941cd5ef55ad728011566fe714e987d1b976c15dbc3a83" +checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" dependencies = [ "cfg-if", "core-foundation-sys", @@ -4307,7 +4392,7 @@ dependencies = [ "base64 0.21.7", "bytes", "flate2", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", diff --git a/server/Cargo.toml b/server/Cargo.toml index 4b8f161a7..b48dd530b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,15 +10,15 @@ build = "build.rs" [dependencies] ### apache arrow/datafusion dependencies # arrow = "51.0.0" -arrow-schema = { version = "51.0.0", features = ["serde"] } -arrow-array = { version = "51.0.0" } -arrow-json = "51.0.0" -arrow-ipc = { version = "51.0.0", features = ["zstd"] } -arrow-select = "51.0.0" -datafusion = "37.1.0" -object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up -parquet = "51.0.0" -arrow-flight = { version = "51.0.0", features = [ "tls" ] } +arrow-schema = { version = "52.1.0", features = ["serde"] } +arrow-array = { version = "52.1.0" } +arrow-json = "52.1.0" +arrow-ipc = { version = "52.1.0", features = ["zstd"] } +arrow-select = "52.1.0" +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" } +object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up +parquet = "52.1.0" +arrow-flight = { version = "52.1.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } @@ -62,7 +62,7 @@ hex = "0.4" hostname = "0.4.0" http = "0.2.7" humantime-serde = "1.1" -itertools = "0.12.1" +itertools = "0.13.0" log = "0.4" num_cpus = "1.15" once_cell = "1.17.1" diff --git a/server/src/handlers/http/llm.rs b/server/src/handlers/http/llm.rs index 87f027d2d..bf1b2968a 100644 --- a/server/src/handlers/http/llm.rs +++ b/server/src/handlers/http/llm.rs @@ -95,7 +95,7 @@ pub async fn make_llm_request(body: web::Json) -> Result for TableScanVisitor { type Node = LogicalPlan; fn f_down(&mut self, node: &Self::Node) -> Result { @@ -232,13 +232,13 @@ fn transform( _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), + Some(table.table_name.to_owned()), time_partition.clone(), ))); _end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), + Some(table.table_name.to_owned()), time_partition, ))); } @@ -246,13 +246,13 @@ fn transform( _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), + Some(table.table_name.to_owned()), event::DEFAULT_TIMESTAMP_KEY, ))); _end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), + Some(table.table_name.to_owned()), event::DEFAULT_TIMESTAMP_KEY, ))); } diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs index d78d0fce9..dfb52db03 100644 --- a/server/src/query/filter_optimizer.rs +++ b/server/src/query/filter_optimizer.rs @@ -1,149 +1,158 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{collections::HashMap, sync::Arc}; - -use datafusion::{ - common::{DFField, DFSchema}, - logical_expr::{Filter, LogicalPlan, Projection}, - optimizer::{optimize_children, OptimizerRule}, - prelude::{lit, or, Column, Expr}, - scalar::ScalarValue, -}; - -/// Rewrites logical plan for source using projection and filter -pub struct FilterOptimizerRule { - pub column: String, - pub literals: Vec, -} - -// Try to add filter node on table scan -// As every table supports projection push down -// we try to directly add projection for column directly to table -// To preserve the orignal projection we must add a projection node with orignal projection -impl OptimizerRule for FilterOptimizerRule { - fn try_optimize( - &self, - plan: &datafusion::logical_expr::LogicalPlan, - config: &dyn datafusion::optimizer::OptimizerConfig, - ) -> datafusion::error::Result> { - // if there are no patterns then the rule cannot be performed - let Some(filter_expr) = self.expr() else { - return Ok(None); - }; - - if let LogicalPlan::Filter(filter) = plan { - if filter.predicate == filter_expr { - return Ok(None); - } - } - - if let LogicalPlan::TableScan(table) = plan { - if table.projection.is_none() - || table - .filters - .iter() - .any(|expr| self.contains_valid_tag_filter(expr)) - { - return Ok(None); - } - - let mut table = table.clone(); - let schema = &table.source.schema(); - let orignal_projection = table.projected_schema.clone(); - - // add filtered column projection to table - if !table - .projected_schema - .has_column_with_unqualified_name(&self.column) - { - let tags_index = schema.index_of(&self.column)?; - let tags_field = schema.field(tags_index); - // modify source table projection to include tags - let mut df_schema = table.projected_schema.fields().clone(); - df_schema.push(DFField::new( - Some(table.table_name.clone()), - tags_field.name(), - tags_field.data_type().clone(), - tags_field.is_nullable(), - )); - - table.projected_schema = - Arc::new(DFSchema::new_with_metadata(df_schema, HashMap::default())?); - if let Some(projection) = &mut table.projection { - projection.push(tags_index) - } - } - - let filter = LogicalPlan::Filter(Filter::try_new( - filter_expr, - Arc::new(LogicalPlan::TableScan(table)), - )?); - let plan = LogicalPlan::Projection(Projection::new_from_schema( - Arc::new(filter), - orignal_projection, - )); - - return Ok(Some(plan)); - } - - // If we didn't find anything then recurse as normal and build the result. - optimize_children(self, plan, config) - } - - fn name(&self) -> &str { - "parseable_read_filter" - } -} - -impl FilterOptimizerRule { - fn expr(&self) -> Option { - let mut patterns = self.literals.iter().map(|literal| { - Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal))) - }); - - let mut filter_expr = patterns.next()?; - for expr in patterns { - filter_expr = or(filter_expr, expr) - } - - Some(filter_expr) - } - - fn contains_valid_tag_filter(&self, expr: &Expr) -> bool { - match expr { - Expr::Like(like) => { - let matches_column = match &*like.expr { - Expr::Column(column) => column.name == self.column, - _ => return false, - }; - - let matches_pattern = match &*like.pattern { - Expr::Literal(ScalarValue::Utf8(Some(literal))) => { - let literal = literal.trim_matches('%'); - self.literals.iter().any(|x| x == literal) - } - _ => false, - }; - - matches_column && matches_pattern && !like.negated - } - _ => false, - } - } -} +// /* +// * Parseable Server (C) 2022 - 2024 Parseable, Inc. +// * +// * This program is free software: you can redistribute it and/or modify +// * it under the terms of the GNU Affero General Public License as +// * published by the Free Software Foundation, either version 3 of the +// * License, or (at your option) any later version. +// * +// * This program is distributed in the hope that it will be useful, +// * but WITHOUT ANY WARRANTY; without even the implied warranty of +// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// * GNU Affero General Public License for more details. +// * +// * You should have received a copy of the GNU Affero General Public License +// * along with this program. If not, see . +// * +// */ +// use std::{collections::HashMap, sync::Arc}; + +// use arrow_schema::Field; +// use datafusion::{ +// common::DFSchema, +// logical_expr::{Filter, LogicalPlan, Projection}, +// optimizer::{optimize_children, OptimizerRule}, +// prelude::{lit, or, Column, Expr}, +// scalar::ScalarValue, +// }; + +// /// Rewrites logical plan for source using projection and filter +// pub struct FilterOptimizerRule { +// pub column: String, +// pub literals: Vec, +// } + +// // Try to add filter node on table scan +// // As every table supports projection push down +// // we try to directly add projection for column directly to table +// // To preserve the orignal projection we must add a projection node with orignal projection +// impl OptimizerRule for FilterOptimizerRule { +// fn try_optimize( +// &self, +// plan: &datafusion::logical_expr::LogicalPlan, +// config: &dyn datafusion::optimizer::OptimizerConfig, +// ) -> datafusion::error::Result> { +// // if there are no patterns then the rule cannot be performed +// let Some(filter_expr) = self.expr() else { +// return Ok(None); +// }; + +// if let LogicalPlan::Filter(filter) = plan { +// if filter.predicate == filter_expr { +// return Ok(None); +// } +// } + +// if let LogicalPlan::TableScan(table) = plan { +// if table.projection.is_none() +// || table +// .filters +// .iter() +// .any(|expr| self.contains_valid_tag_filter(expr)) +// { +// return Ok(None); +// } + +// let mut table = table.clone(); +// let schema = &table.source.schema(); +// let orignal_projection = table.projected_schema.clone(); + +// // add filtered column projection to table +// if !table +// .projected_schema +// .has_column_with_unqualified_name(&self.column) +// { +// let tags_index = schema.index_of(&self.column)?; +// let tags_field = schema.field(tags_index); +// // modify source table projection to include tags +// let df_schema = table.projected_schema.fields().clone(); + +// // from datafusion 37.1.0 -> 40.0.0 +// // `DFField` has been removed +// // `DFSchema.new_with_metadata()` has changed +// // it requires `qualified_fields`(`Vec<(Option, Arc)>`) instead of `fields` +// // hence, use `DFSchema::from_unqualified_fields()` for relatively unchanged code + +// df_schema.to_vec().push(Arc::new(Field::new( +// tags_field.name(), +// tags_field.data_type().clone(), +// tags_field.is_nullable(), +// ))); + +// table.projected_schema = +// Arc::new(DFSchema::from_unqualified_fields(df_schema, HashMap::default())?); +// if let Some(projection) = &mut table.projection { +// projection.push(tags_index) +// } +// } + +// let filter = LogicalPlan::Filter(Filter::try_new( +// filter_expr, +// Arc::new(LogicalPlan::TableScan(table)), +// )?); +// let plan = LogicalPlan::Projection(Projection::new_from_schema( +// Arc::new(filter), +// orignal_projection, +// )); + +// return Ok(Some(plan)); +// } + +// // If we didn't find anything then recurse as normal and build the result. + +// // TODO: replace `optimize_children()` since it will be removed +// // But it is not being used anywhere, so might as well just let it be for now +// optimize_children(self, plan, config) +// } + +// fn name(&self) -> &str { +// "parseable_read_filter" +// } +// } + +// impl FilterOptimizerRule { +// fn expr(&self) -> Option { +// let mut patterns = self.literals.iter().map(|literal| { +// Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal))) +// }); + +// let mut filter_expr = patterns.next()?; +// for expr in patterns { +// filter_expr = or(filter_expr, expr) +// } + +// Some(filter_expr) +// } + +// fn contains_valid_tag_filter(&self, expr: &Expr) -> bool { +// match expr { +// Expr::Like(like) => { +// let matches_column = match &*like.expr { +// Expr::Column(column) => column.name == self.column, +// _ => return false, +// }; + +// let matches_pattern = match &*like.pattern { +// Expr::Literal(ScalarValue::Utf8(Some(literal))) => { +// let literal = literal.trim_matches('%'); +// self.literals.iter().any(|x| x == literal) +// } +// _ => false, +// }; + +// matches_column && matches_pattern && !like.negated +// } +// _ => false, +// } +// } +// } diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 98d097c10..6e37a5f36 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -491,18 +491,28 @@ impl TableProvider for StandardTableProvider { )?) } - fn supports_filter_pushdown( + /* + Updated the function signature (and name) + Now it handles multiple filters + */ + fn supports_filters_pushdown( &self, - filter: &Expr, - ) -> Result { - if expr_in_boundary(filter) { - // if filter can be handled by time partiton pruning, it is exact - Ok(TableProviderFilterPushDown::Exact) - } else { - // otherwise, we still might be able to handle the filter with file - // level mechanisms such as Parquet row group pruning. - Ok(TableProviderFilterPushDown::Inexact) - } + filters: &[&Expr], + ) -> Result, DataFusionError> { + let res_vec = filters + .iter() + .map(|filter| { + if expr_in_boundary(filter) { + // if filter can be handled by time partiton pruning, it is exact + TableProviderFilterPushDown::Exact + } else { + // otherwise, we still might be able to handle the filter with file + // level mechanisms such as Parquet row group pruning. + TableProviderFilterPushDown::Inexact + } + }) + .collect_vec(); + Ok(res_vec) } } @@ -722,11 +732,15 @@ fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option< } } +/* `BinaryExp` doesn't implement `Copy` */ fn extract_timestamp_bound( binexpr: BinaryExpr, time_partition: Option, ) -> Option<(Operator, NaiveDateTime)> { - Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) + Some(( + binexpr.op.clone(), + extract_from_lit(binexpr, time_partition)?, + )) } async fn collect_manifest_files( @@ -796,7 +810,8 @@ trait ManifestExt: ManifestFile { let Expr::Literal(value) = &*expr.right else { return None; }; - Some((expr.op, value)) + /* `BinaryExp` doesn't implement `Copy` */ + Some((expr.op.clone(), value)) } let Some(col) = self.find_matching_column(partial_filter) else { diff --git a/server/src/storage/metrics_layer.rs b/server/src/storage/metrics_layer.rs index 04c2f3346..9cf125803 100644 --- a/server/src/storage/metrics_layer.rs +++ b/server/src/storage/metrics_layer.rs @@ -26,10 +26,9 @@ use async_trait::async_trait; use bytes::Bytes; use futures_util::{stream::BoxStream, Stream, StreamExt}; use object_store::{ - path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutOptions, PutResult, Result as ObjectStoreResult, + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, }; -use tokio::io::AsyncWrite; /* NOTE: Keeping these imports as they would make migration to object_store 0.10.0 easier use object_store::{MultipartUpload, PutMultipartOpts, PutPayload} @@ -60,7 +59,7 @@ impl ObjectStore for MetricLayer { async fn put( &self, location: &Path, - bytes: Bytes, /* PutPayload */ + bytes: PutPayload, /* PutPayload */ ) -> ObjectStoreResult { let time = time::Instant::now(); let put_result = self.inner.put(location, bytes).await?; @@ -74,7 +73,7 @@ impl ObjectStore for MetricLayer { async fn put_opts( &self, location: &Path, - payload: Bytes, /* PutPayload */ + payload: PutPayload, /* PutPayload */ opts: PutOptions, ) -> ObjectStoreResult { let time = time::Instant::now(); @@ -86,22 +85,22 @@ impl ObjectStore for MetricLayer { return Ok(put_result); } - // ! removed in object_store 0.10.0 - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> object_store::Result<()> { - let time = time::Instant::now(); - let elapsed = time.elapsed().as_secs_f64(); - self.inner.abort_multipart(location, multipart_id).await?; - QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["PUT_MULTIPART_ABORT", "200"]) - .observe(elapsed); - Ok(()) - } - - /* Keep for easier migration to object_store 0.10.0 + // // ! removed in object_store 0.10.0 + // async fn abort_multipart( + // &self, + // location: &Path, + // multipart_id: &MultipartId, + // ) -> object_store::Result<()> { + // let time = time::Instant::now(); + // let elapsed = time.elapsed().as_secs_f64(); + // self.inner.abort_multipart(location, multipart_id).await?; + // QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + // .with_label_values(&["PUT_MULTIPART_ABORT", "200"]) + // .observe(elapsed); + // Ok(()) + // } + + /* Keep for easier migration to object_store 0.10.0 */ async fn put_multipart_opts( &self, location: &Path, @@ -115,13 +114,10 @@ impl ObjectStore for MetricLayer { .observe(elapsed); Ok(multipart_upload) - } */ + } // todo completly tracking multipart upload - async fn put_multipart( - &self, - location: &Path, - ) -> ObjectStoreResult<(MultipartId, Box)> /* ObjectStoreResult> */ + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> /* ObjectStoreResult<(MultipartId, Box)> */ { let time = time::Instant::now(); let multipart_upload = self.inner.put_multipart(location).await?; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index ff4d26162..67c056006 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -28,10 +28,8 @@ use futures::{StreamExt, TryStreamExt}; use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; -use object_store::{ClientOptions, ObjectStore}; +use object_store::{ClientOptions, ObjectStore, PutPayload}; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::collections::BTreeMap; use std::iter::Iterator; @@ -49,6 +47,7 @@ use super::{ ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +#[allow(dead_code)] // in bytes const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; const CONNECT_TIMEOUT_SECS: u64 = 5; @@ -253,7 +252,7 @@ impl S3 { async fn _put_object( &self, path: &RelativePath, - resource: Bytes, + resource: PutPayload, ) -> Result<(), ObjectStorageError> { let time = Instant::now(); let resp = self.client.put(&to_object_store_path(path), resource).await; @@ -377,10 +376,15 @@ impl S3 { async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { let instant = Instant::now(); - let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + // // TODO: Uncomment this when multipart is fixed + // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + + let should_multipart = false; let res = if should_multipart { - self._upload_multipart(key, path).await + // self._upload_multipart(key, path).await + // this branch will never get executed + Ok(()) } else { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await?; @@ -397,45 +401,48 @@ impl S3 { res } - async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { - let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; - let mut file = OpenOptions::new().read(true).open(path).await?; - - let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; - - let close_multipart = |err| async move { - log::error!("multipart upload failed. {:?}", err); - self.client - .abort_multipart(&key.into(), &multipart_id) - .await - }; - - loop { - match file.read(&mut buf).await { - Ok(len) => { - if len == 0 { - break; - } - if let Err(err) = async_writer.write_all(&buf[0..len]).await { - close_multipart(err).await?; - break; - } - if let Err(err) = async_writer.flush().await { - close_multipart(err).await?; - break; - } - } - Err(err) => { - close_multipart(err).await?; - break; - } - } - } - - async_writer.shutdown().await?; - - Ok(()) - } + // TODO: introduce parallel, multipart-uploads if required + // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; + // let mut file = OpenOptions::new().read(true).open(path).await?; + + // // let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; + // let mut async_writer = self.client.put_multipart(&key.into()).await?; + + // /* `abort_multipart()` has been removed */ + // // let close_multipart = |err| async move { + // // log::error!("multipart upload failed. {:?}", err); + // // self.client + // // .abort_multipart(&key.into(), &multipart_id) + // // .await + // // }; + + // loop { + // match file.read(&mut buf).await { + // Ok(len) => { + // if len == 0 { + // break; + // } + // if let Err(err) = async_writer.write_all(&buf[0..len]).await { + // // close_multipart(err).await?; + // break; + // } + // if let Err(err) = async_writer.flush().await { + // // close_multipart(err).await?; + // break; + // } + // } + // Err(err) => { + // // close_multipart(err).await?; + // break; + // } + // } + // } + + // async_writer.shutdown().await?; + + // Ok(()) + // } } #[async_trait] @@ -545,7 +552,7 @@ impl ObjectStorage for S3 { path: &RelativePath, resource: Bytes, ) -> Result<(), ObjectStorageError> { - self._put_object(path, resource) + self._put_object(path, resource.into()) .await .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?;