Skip to content

Commit

Permalink
Merge pull request #439 from helium/madninja/poc_pre_connect
Browse files Browse the repository at this point in the history
connect to ingest while signing
  • Loading branch information
madninja committed Aug 3, 2023
2 parents 5d50596 + af3ff93 commit 1d85293
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 35 deletions.
65 changes: 33 additions & 32 deletions src/beaconer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,23 @@ impl Beaconer {
.map_ok(|BeaconResp { powe, tmst }| (powe, tmst))
.await?;

let report = self
.mk_beacon_report(beacon.clone(), powe, tmst)
.inspect_err(|err| warn!(beacon_id, %err, "poc beacon report"))
.await?;

PocIotService::new(self.poc_ingest_uri.clone())
.submit_beacon(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report",))
.inspect_ok(|_| info!(beacon_id, "poc beacon report submitted",))
.await?;
// Construct concurrent futures for connecting to the poc ingester and
// signing the report
let report_fut = self.mk_beacon_report(beacon.clone(), powe, tmst);
let service_fut = PocIotService::connect(self.poc_ingest_uri.clone());

match tokio::try_join!(report_fut, service_fut) {
Ok((report, mut poc_service)) => {
poc_service
.submit_beacon(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report"))
.inspect_ok(|_| info!(beacon_id, "poc beacon report submitted"))
.await?
}
Err(err) => {
warn!(beacon_id, %err, "poc beacon report");
}
}

Ok(beacon)
}
Expand Down Expand Up @@ -237,30 +244,24 @@ impl Beaconer {
}
}

let report = match self.mk_witness_report(packet).await {
Ok(report) => report,
// Construct concurrent futures for connecting to the poc ingester and
// signing the report
let report_fut = self.mk_witness_report(packet);
let service_fut = PocIotService::connect(self.poc_ingest_uri.clone());

match tokio::try_join!(report_fut, service_fut) {
Ok((report, mut poc_service)) => {
let beacon_id = report.data.to_b64();
let _ = poc_service
.submit_witness(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report"))
.inspect_ok(|_| info!(beacon_id, "poc witness report submitted"))
.await;
}
Err(err) => {
warn!(%err, "ignoring invalid witness report");
return;
warn!(%err, "poc witness report");
}
};

let _ = PocIotService::new(self.poc_ingest_uri.clone())
.submit_witness(report.clone())
.inspect_err(|err| {
info!(
beacon = report.data.to_b64(),
%err,
"submit poc witness report"
)
})
.inspect_ok(|_| {
info!(
beacon = report.data.to_b64(),
"poc witness report submitted"
)
})
.await;
}
}

/// Construct a next beacon time based on a fraction of the given interval.
Expand Down
7 changes: 4 additions & 3 deletions src/service/poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ type PocIotClient = helium_proto::services::poc_lora::Client<Channel>;
pub struct PocIotService(PocIotClient);

impl PocIotService {
pub fn new(uri: Uri) -> Self {
pub async fn connect(uri: Uri) -> Result<Self> {
let channel = Endpoint::from(uri)
.connect_timeout(CONNECT_TIMEOUT)
.timeout(RPC_TIMEOUT)
.connect_lazy();
.connect()
.await?;
let client = services::poc_lora::Client::new(channel);
Self(client)
Ok(Self(client))
}

pub async fn submit_beacon(&mut self, req: LoraBeaconReportReqV1) -> Result {
Expand Down

0 comments on commit 1d85293

Please sign in to comment.