Skip to content

Commit

Permalink
refactor: split connection node from common for later endpoint
Browse files Browse the repository at this point in the history
Closes #99
  • Loading branch information
bbangert committed Feb 13, 2019
1 parent 0cf3a92 commit a36f94c
Show file tree
Hide file tree
Showing 27 changed files with 93 additions and 152 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock → autopush-common/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 4 additions & 26 deletions Cargo.toml → autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "autopush"
version = "1.53.0"
name = "autopush_common"
version = "1.0.0"
authors = [
"Ben Bangert <ben@groovie.org>",
"JR Conlin <jrconlin@mozilla.com>",
Expand All @@ -10,35 +10,20 @@ authors = [
edition = "2018"

[lib]
name = "autopush"

[[bin]]
name = "autopush_rs"
path = "src/main.rs"
name = "autopush_common"

[dependencies]
base64 = "0.10.0"
bytes = "0.4.11"
cadence = "0.16.0"
chan-signal = "0.3.2"
chrono = "0.4.6"
config = "0.9.2"
docopt = "1.0.2"
env_logger = { version = "0.6.0", default-features = false }
error-chain = "0.12.0"
fernet = "0.1.0"
futures = "0.1.25"
futures-backoff = "0.1.0"
hex = "0.3.2"
httparse = "1.3.3"
# XXX: pin to hyper 0.11 for now: 0.12 has many changes..
hyper = "0.11.27"
lazy_static = "1.2.0"
libc = "0.2.46"
log = { version = "0.4.6", features = ["max_level_info", "release_max_level_info"] }
matches = "0.1.8"
mozsvc-common = "0.1.0"
openssl = "0.10.16"
rand = "0.6.3"
regex = "1.1.0"
reqwest = "0.9.5"
Expand All @@ -48,22 +33,15 @@ rusoto_dynamodb = "0.36.0"
sentry = { version = "0.13.0", features = ["with_error_chain"] }
serde = "1.0.84"
serde_derive = "1.0.84"
serde_dynamodb = { git = "https://github.com/mockersf/serde_dynamodb", rev = "240974d591466c4f9a0831162a28d690f2f6e51a" }
serde_dynamodb = "0.2.1"
serde_json = "1.0.34"
slog = { version = "2.4.1", features = ["max_level_trace", "release_max_level_info"] }
slog-async = "2.3.0"
slog-term = "2.4.0"
slog-mozlog-json = "0.1.0"
slog-scope = "4.1.1"
slog-stdlog = "3.0.2"
# state_machine_future = { version = "0.1.6", features = ["debug_code_generation"] }
state_machine_future = "0.2.0"
time = "0.1.41"
tokio-core = "0.1.17"
tokio-io = "0.1.10"
tokio-openssl = "0.3.0"
tokio-service = "0.1.0"
tokio-tungstenite = { version = "0.6.0", default-features = false }
tungstenite = { version = "0.6.1", default-features = false }
uuid = { version = "0.7.1", features = ["serde", "v4"] }
# XXX: pin woothee until >= 0.8.1
Expand Down
File renamed without changes.
File renamed without changes.
24 changes: 9 additions & 15 deletions src/db/mod.rs → autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod util;

use crate::errors::*;
use crate::protocol::Notification;
use crate::server::{Server, ServerOptions};
use crate::util::timing::sec_since_epoch;

use self::commands::{
Expand Down Expand Up @@ -58,7 +57,6 @@ pub struct CheckStorageResponse {

pub enum RegisterResponse {
Success { endpoint: String },

Error { error_msg: String, status: u32 },
}

Expand All @@ -71,7 +69,11 @@ pub struct DynamoStorage {
}

impl DynamoStorage {
pub fn from_opts(opts: &ServerOptions, metrics: StatsdClient) -> Result<Self> {
pub fn from_opts(
message_table_name: &str,
router_table_name: &str,
metrics: StatsdClient,
) -> Result<Self> {
let ddb: Box<dyn DynamoDb> = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
Box::new(DynamoDbClient::new_with(
HttpClient::new().chain_err(|| "TLS initialization error")?,
Expand All @@ -86,7 +88,7 @@ impl DynamoStorage {
};
let ddb = Rc::new(ddb);

let mut message_table_names = list_message_tables(&ddb, &opts._message_table_name)
let mut message_table_names = list_message_tables(&ddb, &message_table_name)
.map_err(|_| "Failed to locate message tables")?;
// Valid message months are the current and last 2 months
message_table_names.sort_unstable_by(|a, b| b.cmp(a));
Expand All @@ -100,7 +102,7 @@ impl DynamoStorage {
Ok(Self {
ddb,
metrics: Rc::new(metrics),
router_table_name: opts._router_table_name.clone(),
router_table_name: router_table_name.to_owned(),
message_table_names,
current_message_month,
})
Expand Down Expand Up @@ -194,23 +196,15 @@ impl DynamoStorage {

pub fn register(
&self,
srv: &Rc<Server>,
uaid: &Uuid,
channel_id: &Uuid,
message_month: &str,
endpoint: &str,
key: Option<String>,
) -> MyFuture<RegisterResponse> {
let ddb = self.ddb.clone();
let endpoint = match srv.make_endpoint(uaid, channel_id, key) {
Ok(result) => result,
Err(_) => {
return Box::new(future::ok(RegisterResponse::Error {
error_msg: "Failed to generate endpoint".to_string(),
status: 400,
}));
}
};
let mut chids = HashSet::new();
let endpoint = endpoint.to_owned();
chids.insert(channel_id.to_hyphenated().to_string());
let response = commands::save_channels(ddb, uaid, chids, message_month)
.and_then(move |_| future::ok(RegisterResponse::Success { endpoint }))
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions autopush-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#![recursion_limit = "1024"]

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate slog;
#[macro_use]
extern crate slog_scope;

#[macro_use]
pub mod db;
pub mod errors;
pub mod logging;
pub mod protocol;
#[macro_use]
pub mod util;
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
40 changes: 21 additions & 19 deletions src/client.rs → autopush/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
//! Management of connected clients to a WebPush server
//!
//! This module is a pretty heavy work in progress. The intention is that
//! this'll house all the various state machine transitions and state management
//! of connected clients. Note that it's expected there'll be a lot of connected
//! clients, so this may appears relatively heavily optimized!
use std::cell::RefCell;
use std::mem;
use std::rc::Rc;
use std::time::Duration;

use cadence::{prelude::*, StatsdClient};
use error_chain::ChainedError;
use futures::future::Either;
Expand All @@ -22,16 +12,21 @@ use rusoto_dynamodb::UpdateItemOutput;
use sentry;
use sentry::integrations::error_chain::event_from_error_chain;
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
use std::cell::RefCell;
use std::mem;
use std::rc::Rc;
use std::time::Duration;
use tokio_core::reactor::Timeout;
use uuid::Uuid;
use woothee::parser::Parser;

use crate::db::{CheckStorageResponse, HelloResponse, RegisterResponse};
use crate::errors::*;
use crate::protocol::{ClientMessage, Notification, ServerMessage, ServerNotification};
use autopush_common::db::{CheckStorageResponse, HelloResponse, RegisterResponse};
use autopush_common::errors::*;
use autopush_common::protocol::{ClientMessage, Notification, ServerMessage, ServerNotification};
use autopush_common::util::megaphone::{Broadcast, BroadcastSubs};
use autopush_common::util::{ms_since_epoch, parse_user_agent, sec_since_epoch};

use crate::server::Server;
use crate::util::megaphone::{Broadcast, BroadcastSubs};
use crate::util::{ms_since_epoch, parse_user_agent, sec_since_epoch};

// Created and handed to the AutopushServer
pub struct RegisteredClient {
Expand Down Expand Up @@ -862,10 +857,17 @@ where
let uaid = webpush.uaid;
let message_month = webpush.message_month.clone();
let srv = data.srv.clone();
let fut = data
.srv
.ddb
.register(&srv, &uaid, &channel_id, &message_month, key);
let fut = match srv.make_endpoint(&uaid, &channel_id, key.clone()) {
Ok(endpoint) => {
data.srv
.ddb
.register(&uaid, &channel_id, &message_month, &endpoint, key)
}
Err(_) => Box::new(future::ok(RegisterResponse::Error {
error_msg: "Failed to generate endpoint".to_string(),
status: 400,
})),
};
transition!(AwaitRegister {
channel_id,
response: fut,
Expand Down
File renamed without changes.
18 changes: 13 additions & 5 deletions src/main.rs → autopush/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
#[macro_use]
extern crate slog;
#[macro_use]
extern crate slog_scope;
#[macro_use]
extern crate serde_derive;

use chan_signal;

use std::env;

use chan_signal::Signal;
use docopt::Docopt;

use autopush::errors::{Result, ResultExt};
use autopush::server::{AutopushServer, ServerOptions};
use autopush::settings::Settings;
use autopush_common::errors::{Result, ResultExt};

mod client;
mod http;
mod server;
mod settings;

use crate::server::{AutopushServer, ServerOptions};
use crate::settings::Settings;

const USAGE: &'static str = "
Usage: autopush_rs [options]
Expand Down
3 changes: 2 additions & 1 deletion src/server/dispatch.rs → autopush/src/server/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use httparse;
use tokio_core::net::TcpStream;
use tokio_io::AsyncRead;

use crate::errors::*;
use autopush_common::errors::*;

use crate::server::tls::MaybeTlsStream;
use crate::server::webpush_io::WebpushIo;

Expand Down
3 changes: 2 additions & 1 deletion src/server/metrics.rs → autopush/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::net::UdpSocket;

use cadence::{BufferedUdpMetricSink, NopMetricSink, QueuingMetricSink, StatsdClient};

use crate::errors::*;
use autopush_common::errors::*;

use crate::server::ServerOptions;

/// Create a cadence StatsdClient from the given options
Expand Down
31 changes: 18 additions & 13 deletions src/server/mod.rs → autopush/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@ use tungstenite::handshake::server::Request;
use tungstenite::Message;
use uuid::Uuid;

use crate::client::{Client, RegisteredClient};
use crate::db::DynamoStorage;
use crate::errors::*;
use crate::errors::{Error, Result};
use crate::http;
use crate::logging;
use crate::protocol::{
use autopush_common::db::DynamoStorage;
use autopush_common::errors::*;
use autopush_common::errors::{Error, Result};
use autopush_common::logging;
use autopush_common::protocol::{
BroadcastValue, ClientMessage, Notification, ServerMessage, ServerNotification,
};
use autopush_common::util::megaphone::{
Broadcast, BroadcastChangeTracker, BroadcastSubs, BroadcastSubsInit, MegaphoneAPIResponse,
};
use autopush_common::util::{timeout, RcObject};

use crate::client::{Client, RegisteredClient};
use crate::http;
use crate::server::dispatch::{Dispatch, RequestType};
use crate::server::metrics::metrics_from_opts;
use crate::server::webpush_io::WebpushIo;
use crate::settings::Settings;
use crate::util::megaphone::{
Broadcast, BroadcastChangeTracker, BroadcastSubs, BroadcastSubsInit, MegaphoneAPIResponse,
};
use crate::util::{timeout, RcObject};

mod dispatch;
mod metrics;
Expand Down Expand Up @@ -294,7 +295,11 @@ impl Server {
let srv = Rc::new(Server {
opts: opts.clone(),
broadcaster: RefCell::new(broadcaster),
ddb: DynamoStorage::from_opts(opts, metrics.clone())?,
ddb: DynamoStorage::from_opts(
&opts._message_table_name,
&opts._router_table_name,
metrics.clone(),
)?,
uaids: RefCell::new(HashMap::new()),
open_connections: Cell::new(0),
handle: core.handle(),
Expand Down Expand Up @@ -963,7 +968,7 @@ pub fn write_version_file(socket: WebpushIo) -> MyFuture<()> {
write_json(
socket,
StatusCode::Ok,
serde_json::Value::from(include_str!("../../version.json")),
serde_json::Value::from(include_str!("../../../version.json")),
)
}

Expand Down
Loading

0 comments on commit a36f94c

Please sign in to comment.