Skip to content

Commit

Permalink
Errors emitted in backend, () returned, recreated in podsync
Browse files Browse the repository at this point in the history
  • Loading branch information
bobrippling committed Feb 22, 2024
1 parent 2c820b1 commit 7d6fa0b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
38 changes: 13 additions & 25 deletions src/backend/backend_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use crate::backend::FindError;

use crate::device::{DeviceAndSub, DeviceUpdate};
use crate::episode::{Episode, EpisodeRaw};
use crate::podsync::{Error, QueryEpisodes, Url};
use crate::podsync::{QueryEpisodes, Url};
use crate::subscription::SubscriptionChangesFromClient;
use crate::user::User;
use crate::Timestamp;

type Result<T> = std::result::Result<T, ()>;

static DB_URL: &str = "sqlite://pod.sql";

pub struct Backend(pub Pool<Sqlite>);
Expand Down Expand Up @@ -46,30 +48,28 @@ impl Backend {
}

impl Backend {
async fn transact<'t, T, R, F>(&self, transaction: T) -> Result<R, Error>
async fn transact<'t, T, R, F>(&self, transaction: T) -> Result<R>
where
T: FnOnce(Transaction<'t, Sqlite>) -> F,
F: Future<Output = Result<(Transaction<'t, Sqlite>, R), Error>>,
F: Future<Output = Result<(Transaction<'t, Sqlite>, R)>>,
{
let tx = self.0.begin().await.map_err(|e| {
error!("error beginning transaction: {:?}", e);
Error::Internal
})?;

// could probably pass &mut *tx here
let (tx, r) = transaction(tx).await?;

tx.commit().await.map_err(|e| {
error!("error committing transaction: {:?}", e);
Error::Internal
})?;

Ok(r)
}
}

impl Backend {
pub async fn find_user(&self, username: &str) -> Result<User, FindError> {
pub async fn find_user(&self, username: &str) -> std::result::Result<User, FindError> {
query_as!(
User,
"
Expand Down Expand Up @@ -110,7 +110,7 @@ impl Backend {
.is_ok()
}

pub async fn users_with_session(&self, session_id: &str) -> Result<Vec<User>, Error> {
pub async fn users_with_session(&self, session_id: &str) -> Result<Vec<User>> {
query_as!(
User,
"
Expand All @@ -124,13 +124,12 @@ impl Backend {
.await
.map_err(|e| {
error!("couldn't query for session {session_id}: {e:?}");
Error::Internal
})
}
}

impl Backend {
pub async fn devices_for_user(&self, username: &str) -> Result<Vec<DeviceAndSub>, Error> {
pub async fn devices_for_user(&self, username: &str) -> Result<Vec<DeviceAndSub>> {
query_as!(
DeviceAndSub,
r#"
Expand All @@ -147,7 +146,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error selecting devices: {:?}", e);
Error::Internal
})
}

Expand All @@ -156,7 +154,7 @@ impl Backend {
username: &str,
device_id: &str,
update: DeviceUpdate,
) -> Result<(), Error> {
) -> Result<()> {
let caption = update.caption;
let r#type = update.r#type;
let type_default = r#type.clone().unwrap_or_default();
Expand Down Expand Up @@ -188,7 +186,6 @@ impl Backend {
.map(|_| ())
.map_err(|e| {
error!("error inserting device: {:?}", e);
Error::Internal
})
}
}
Expand All @@ -199,7 +196,7 @@ impl Backend {
username: &str,
device_id: &str,
since: Timestamp,
) -> Result<Vec<Url>, Error> {
) -> Result<Vec<Url>> {
query_as!(
Url,
r#"
Expand All @@ -222,7 +219,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error selecting subscriptions: {e:?}");
Error::Internal
})
}

Expand All @@ -232,7 +228,7 @@ impl Backend {
device_id: &str,
changes: &SubscriptionChangesFromClient,
now: Timestamp,
) -> Result<(), Error> {
) -> Result<()> {
self.transact(|mut tx| async {
for url in &changes.remove {
query!(
Expand All @@ -254,7 +250,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error deleting (updating) subscription: {e:?}");
Error::Internal
})?;
}

Expand All @@ -277,7 +272,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error inserting subscription: {e:?}");
Error::Internal
})?;
}

Expand All @@ -296,11 +290,7 @@ impl Backend {
}

impl Backend {
pub async fn episodes(
&self,
username: &str,
query: &QueryEpisodes,
) -> Result<Vec<EpisodeRaw>, Error> {
pub async fn episodes(&self, username: &str, query: &QueryEpisodes) -> Result<Vec<EpisodeRaw>> {
let since = query.since.unwrap_or_else(Timestamp::zero);
let podcast_filter = &query.podcast;
let device_filter = &query.device;
Expand Down Expand Up @@ -332,7 +322,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error selecting episodes: {e:?}");
Error::Internal
})
}

Expand All @@ -341,7 +330,7 @@ impl Backend {
username: &str,
now: Timestamp,
changes: Vec<Episode>,
) -> Result<(), Error> {
) -> Result<()> {
self.transact(|mut tx| async {
for change in changes {
let hash = change.hash();
Expand Down Expand Up @@ -421,7 +410,6 @@ impl Backend {
.await
.map_err(|e| {
error!("error querying mid-transaction: {:?}", e);
Error::Internal
})?;
}

Expand Down
44 changes: 34 additions & 10 deletions src/podsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ impl PodSync {
pub async fn authenticate(self: &Arc<Self>, session_id: SessionId) -> Result<PodSyncAuthed> {
let session_str = session_id.to_string();

let users = self.0.users_with_session(&session_str).await?;
let users = self
.0
.users_with_session(&session_str)
.await
.map_err(|()| Error::Internal)?;

match &users[..] {
[] => {
Expand Down Expand Up @@ -206,17 +210,26 @@ impl PodSyncAuthed<true> {
let username = &self.username;
trace!("{username} getting devices");

self.sync.0.devices_for_user(username).await.map(|devs| {
info!("{username}, {} devices", devs.len());
devs
})
self.sync
.0
.devices_for_user(username)
.await
.map(|devs| {
info!("{username}, {} devices", devs.len());
devs
})
.map_err(|()| Error::Internal)
}

pub async fn update_device(&self, device_id: &str, update: DeviceUpdate) -> Result<()> {
let username = &self.username;
info!("{username} updating device {device_id}: {update:?}");

self.sync.0.update_device(username, device_id, update).await
self.sync
.0
.update_device(username, device_id, update)
.await
.map_err(|()| Error::Internal)
}

pub async fn subscriptions(
Expand All @@ -232,7 +245,8 @@ impl PodSyncAuthed<true> {
.sync
.0
.subscriptions(username, device_id, since)
.await?;
.await
.map_err(|()| Error::Internal)?;

enum E {
Created(String),
Expand Down Expand Up @@ -293,7 +307,8 @@ impl PodSyncAuthed<true> {
self.sync
.0
.update_subscriptions(username, device_id, &changes, now)
.await?;
.await
.map_err(|()| Error::Internal)?;

Ok(UpdatedUrls {
timestamp: now,
Expand All @@ -315,7 +330,12 @@ impl PodSyncAuthed<true> {
query.podcast.as_deref().unwrap_or("<none>"),
);

let episodes = self.sync.0.episodes(username, &query).await?;
let episodes = self
.sync
.0
.episodes(username, &query)
.await
.map_err(|()| Error::Internal)?;

let latest = episodes.iter().filter_map(|ep| ep.modified).max();

Expand Down Expand Up @@ -365,7 +385,11 @@ impl PodSyncAuthed<true> {
let now = now()?;
let change_count = changes.len();

self.sync.0.update_episodes(username, now, changes).await?;
self.sync
.0
.update_episodes(username, now, changes)
.await
.map_err(|()| Error::Internal)?;

info!("{username} updated {change_count} episodes, timestamp {now}");

Expand Down

0 comments on commit 7d6fa0b

Please sign in to comment.