Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Aurora serverless (data api) driver #866

Closed
wants to merge 15 commits into from
553 changes: 535 additions & 18 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ offline = [ "sqlx-macros/offline", "sqlx-core/offline" ]

# intended mainly for CI and docs
all = [ "tls", "all-databases", "all-types" ]
all-databases = [ "mysql", "sqlite", "postgres", "mssql", "any" ]
all-databases = [ "mysql", "sqlite", "postgres", "mssql", "any", "aurora" ]
all-types = [ "bigdecimal", "decimal", "json", "time", "chrono", "ipnetwork", "uuid", "bit-vec" ]

# previous runtimes, available as features for error messages better than just
Expand Down Expand Up @@ -79,6 +79,7 @@ postgres = [ "sqlx-core/postgres", "sqlx-macros/postgres" ]
mysql = [ "sqlx-core/mysql", "sqlx-macros/mysql" ]
sqlite = [ "sqlx-core/sqlite", "sqlx-macros/sqlite" ]
mssql = [ "sqlx-core/mssql", "sqlx-macros/mssql" ]
aurora = [ "sqlx-core/aurora" ]

# types
bigdecimal = [ "sqlx-core/bigdecimal", "sqlx-macros/bigdecimal" ]
Expand Down
11 changes: 8 additions & 3 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ default = [ "migrate" ]
migrate = [ "sha2", "crc" ]

# databases
all-databases = [ "postgres", "mysql", "sqlite", "mssql", "any" ]
all-databases = [ "postgres", "mysql", "sqlite", "mssql", "aurora", "any" ]
postgres = [ "md-5", "sha2", "base64", "sha-1", "rand", "hmac", "futures-channel/sink", "futures-util/sink" ]
mysql = [ "sha-1", "sha2", "generic-array", "num-bigint", "base64", "digest", "rand", "rsa" ]
sqlite = [ "libsqlite3-sys" ]
mssql = [ "uuid", "encoding_rs", "regex" ]
aurora = [ "rusoto_core", "rusoto_rds_data", "regex" ]
any = []

# types
Expand All @@ -46,8 +47,8 @@ runtime-tokio-rustls = [ "sqlx-rt/runtime-tokio-rustls", "_tls-rustls", "_rt-tok
_rt-actix = []
_rt-async-std = []
_rt-tokio = []
_tls-native-tls = []
_tls-rustls = [ "rustls", "webpki", "webpki-roots" ]
_tls-native-tls = [ "rusoto_core/native-tls", "rusoto_rds_data/native-tls" ]
_tls-rustls = [ "rustls", "webpki", "webpki-roots", "rusoto_core/rustls", "rusoto_rds_data/rustls" ]
Comment on lines +50 to +51
Copy link
Author

@tarkah tarkah Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this causes the deps to be included, even if "aurora" isn't. It seems there is a rustlang tracking issue for weak dependencies that could solve this, but isn't ready yet. Is there another workaround so we don't have 2 feature gates for aurora... "aurora-native-tls" / "aurora-rustls"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross-reference: rust-lang/cargo#8818


# support offline/decoupled building (enables serialization of `Describe`)
offline = [ "serde", "either/serde" ]
Expand Down Expand Up @@ -107,3 +108,7 @@ webpki-roots = { version = "0.20.0", optional = true }
whoami = "0.9.0"
stringprep = "0.1.2"
lru-cache = "0.1.2"

# Auroroa dependencies
rusoto_core = { version = "0.45.0", default-features = false, optional = true }
rusoto_rds_data = { version = "0.45.0", default-features = false, features = ["serialize_structs", "deserialize_structs"], optional = true }
27 changes: 27 additions & 0 deletions sqlx-core/src/aurora/arguments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::arguments::Arguments;
use crate::aurora::Aurora;
use crate::encode::Encode;
use crate::types::Type;

use rusoto_rds_data::SqlParameter;

/// Implementation of [`Arguments`] for Aurora.
#[derive(Default)]
pub struct AuroraArguments {
pub(crate) parameters: Vec<SqlParameter>,
}

impl<'q> Arguments<'q> for AuroraArguments {
type Database = Aurora;

fn reserve(&mut self, additional: usize, _size: usize) {
self.parameters.reserve(additional);
}

fn add<T>(&mut self, value: T)
where
T: Encode<'q, Self::Database> + Type<Self::Database>,
{
let _ = value.encode(&mut self.parameters);
}
}
29 changes: 29 additions & 0 deletions sqlx-core/src/aurora/column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::aurora::type_info::AuroraTypeInfo;
use crate::aurora::Aurora;
use crate::column::Column;
use crate::ext::ustr::UStr;

#[derive(Debug, Clone)]
pub struct AuroraColumn {
pub(crate) ordinal: usize,
pub(crate) name: UStr,
pub(crate) type_info: AuroraTypeInfo,
}

impl crate::column::private_column::Sealed for AuroraColumn {}

impl Column for AuroraColumn {
type Database = Aurora;

fn ordinal(&self) -> usize {
self.ordinal
}

fn name(&self) -> &str {
&*self.name
}

fn type_info(&self) -> &AuroraTypeInfo {
&self.type_info
}
}
189 changes: 189 additions & 0 deletions sqlx-core/src/aurora/connection/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use super::AuroraConnection;
use crate::aurora::error::AuroraDatabaseError;
use crate::aurora::statement::AuroraStatementMetadata;
use crate::aurora::{
Aurora, AuroraArguments, AuroraColumn, AuroraDone, AuroraRow, AuroraStatement, AuroraTypeInfo,
};
use crate::describe::Describe;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::ext::ustr::UStr;
use crate::logger::QueryLogger;

use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::stream;
use futures_util::{pin_mut, TryStreamExt};
use rusoto_rds_data::{ExecuteStatementRequest, ExecuteStatementResponse, RdsData};
use std::borrow::Cow;
use std::sync::Arc;

impl AuroraConnection {
async fn run<'e, 'c: 'e, 'q: 'e>(
&'c mut self,
query: &'q str,
arguments: Option<AuroraArguments>,
) -> Result<impl Stream<Item = Result<Either<AuroraDone, AuroraRow>, Error>> + 'e, Error> {
let mut logger = QueryLogger::new(query, self.log_settings.clone());

// TODO: is this correct?
let transaction_id = self.transaction_ids.last().cloned();

let request = ExecuteStatementRequest {
sql: query.to_owned(),
parameters: arguments.map(|m| m.parameters),
resource_arn: self.resource_arn.clone(),
secret_arn: self.secret_arn.clone(),
database: self.database.clone(),
schema: self.schema.clone(),
transaction_id,
include_result_metadata: Some(true),
..Default::default()
};

let ExecuteStatementResponse {
column_metadata,
number_of_records_updated,
records,
..
} = self
.client
.execute_statement(request)
.await
.map_err(AuroraDatabaseError)?;

let rows_affected = number_of_records_updated.unwrap_or_default() as u64;
let column_metadata = column_metadata.unwrap_or_default();

let mut rows = records
.unwrap_or_default()
.into_iter()
.map(|fields| {
let columns: Vec<_> = fields
.iter()
.zip(&column_metadata)
.enumerate()
.map(|(ordinal, (field, metadata))| AuroraColumn {
ordinal,
name: UStr::new(metadata.name.as_deref().unwrap_or_default()),
type_info: AuroraTypeInfo::from(field),
})
.collect();

let column_names = columns
.iter()
.map(|column| (column.name.clone(), column.ordinal))
.collect();
let parameters = columns.iter().map(|column| column.type_info).collect();

let metadata = Arc::new(AuroraStatementMetadata {
columns,
column_names,
parameters,
});

let row = AuroraRow { fields, metadata };

logger.increment_rows();

Ok(Either::Right(row))
})
.collect::<Vec<_>>();

rows.push(Ok(Either::Left(AuroraDone { rows_affected })));

logger.finish();

Ok(stream::iter(rows))
}
}

impl<'c> Executor<'c> for &'c mut AuroraConnection {
type Database = Aurora;

fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
mut query: E,
) -> BoxStream<'e, Result<Either<AuroraDone, AuroraRow>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let sql = query.sql();
let arguments = query.take_arguments();

// TODO: implement statement caching?
//let metadata = query.statement();
//let persistent = query.persistent();

Box::pin(try_stream! {
let s = self.run(sql, arguments).await?;
pin_mut!(s);

while let Some(v) = s.try_next().await? {
r#yield!(v);
}

Ok(())
})
}

fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<AuroraRow>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let mut s = self.fetch_many(query);

Box::pin(async move {
while let Some(v) = s.try_next().await? {
if let Either::Right(r) = v {
return Ok(Some(r));
}
}

Ok(None)
})
}

fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
_parameters: &[AuroraTypeInfo],
) -> BoxFuture<'e, Result<AuroraStatement<'q>, Error>>
where
'c: 'e,
{
Box::pin(async move {
Ok(AuroraStatement {
sql: Cow::Borrowed(sql),
metadata: Default::default(),
})
})
}

fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>>
where
'c: 'e,
{
Box::pin(async move {
let metadata: AuroraStatementMetadata = Default::default();

let nullable = Vec::with_capacity(metadata.columns.len());

Ok(Describe {
nullable,
columns: metadata.columns,
parameters: None,
})
})
}
}
Loading