Skip to content

Commit

Permalink
Merge pull request #2879 from jarhodes314/enable-bridge-for-robot-tests
Browse files Browse the repository at this point in the history
Fix remaining test failures from built-in bridge
  • Loading branch information
jarhodes314 committed May 31, 2024
2 parents 08577c9 + 299627a commit ba2b00a
Show file tree
Hide file tree
Showing 22 changed files with 312 additions and 136 deletions.
32 changes: 22 additions & 10 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ pub fn service_creation_message(
ancestors: &[String],
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?,
))
}

/// Create a SmartREST message for creating a service on device.
/// The provided ancestors list must contain all the parents of the given service
/// starting from its immediate parent device.
pub fn service_creation_message_payload(
service_id: &str,
service_name: &str,
service_type: &str,
service_status: &str,
) -> Result<String, InvalidValueError> {
// TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format
if service_id.is_empty() {
return Err(InvalidValueError {
Expand All @@ -94,16 +109,13 @@ pub fn service_creation_message(
});
}

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]),
))
Ok(fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]))
}

/// Create a SmartREST message for updating service status.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge/src/cli/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use tedge_config::system_services::SystemService;

#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr)]
#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr, PartialEq, Eq)]
pub enum Cloud {
#[strum(serialize = "Cumulocity")]
C8y,
Expand Down
74 changes: 55 additions & 19 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,19 +502,25 @@ fn new_bridge(
}
}

if let Err(err) =
write_generic_mosquitto_config_to_file(config_location, common_mosquitto_config)
{
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
}

if bridge_config.bridge_location == BridgeLocation::Mosquitto {
println!("Saving configuration for requested bridge.\n");

if let Err(err) =
write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config)
{
if let Err(err) = write_bridge_config_to_file(config_location, bridge_config) {
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
}
} else {
println!("Deleting mosquitto bridge configuration in favour of built-in bridge\n");
clean_up(config_location, bridge_config)?;
use_built_in_bridge(config_location, bridge_config)?;
}

if let Err(err) = service_manager_result {
Expand Down Expand Up @@ -543,18 +549,7 @@ fn new_bridge(
Ok(())
}

fn restart_mosquitto(
bridge_config: &BridgeConfig,
service_manager: &dyn SystemServiceManager,
config_location: &TEdgeConfigLocation,
) -> Result<(), ConnectError> {
println!("Restarting mosquitto service.\n");

if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
return Err(err.into());
}

pub fn chown_certificate_and_key(bridge_config: &BridgeConfig) {
let (user, group) = match bridge_config.bridge_location {
BridgeLocation::BuiltIn => ("tedge", "tedge"),
BridgeLocation::Mosquitto => (crate::BROKER_USER, crate::BROKER_GROUP),
Expand All @@ -570,6 +565,21 @@ fn restart_mosquitto(
warn!("Failed to change ownership of {path} to {user}:{group}: {err}");
}
}
}

fn restart_mosquitto(
bridge_config: &BridgeConfig,
service_manager: &dyn SystemServiceManager,
config_location: &TEdgeConfigLocation,
) -> Result<(), ConnectError> {
println!("Restarting mosquitto service.\n");

if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
return Err(err.into());
}

chown_certificate_and_key(bridge_config);

if let Err(err) = service_manager.restart_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
Expand Down Expand Up @@ -597,7 +607,7 @@ fn enable_software_management(

// To preserve error chain and not discard other errors we need to ignore error here
// (don't use '?' with the call to this function to preserve original error).
fn clean_up(
pub fn clean_up(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
Expand All @@ -606,6 +616,19 @@ fn clean_up(
Ok(())
}

pub fn use_built_in_bridge(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
let path = get_bridge_config_file_path(config_location, bridge_config);
std::fs::write(
path,
"# This file is left empty as the built-in bridge is enabled",
)
.or_else(ok_if_not_found)?;
Ok(())
}

fn bridge_config_exists(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
Expand All @@ -619,9 +642,8 @@ fn bridge_config_exists(
Ok(())
}

fn write_bridge_config_to_file(
fn write_generic_mosquitto_config_to_file(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
common_mosquitto_config: &CommonMosquittoConfig,
) -> Result<(), ConnectError> {
let dir_path = config_location
Expand All @@ -637,6 +659,20 @@ fn write_bridge_config_to_file(
common_mosquitto_config.serialize(&mut common_draft)?;
common_draft.persist()?;

Ok(())
}

fn write_bridge_config_to_file(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
let dir_path = config_location
.tedge_config_root_path
.join(TEDGE_BRIDGE_CONF_DIR_PATH);

// This will forcefully create directory structure if it doesn't exist, we should find better way to do it, maybe config should deal with it?
create_directories(dir_path)?;

let config_path = get_bridge_config_file_path(config_location, bridge_config);
let mut config_draft = DraftFile::new(config_path)?.with_mode(0o644);
bridge_config.serialize(&mut config_draft)?;
Expand Down
8 changes: 2 additions & 6 deletions crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,8 @@ impl DisconnectBridgeCommand {
// If bridge config file was not found we assume that the bridge doesn't exist,
// We finish early returning exit code 0.
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if self.built_in_bridge {
Ok(())
} else {
println!("Bridge doesn't exist. Operation finished!");
Err(DisconnectBridgeError::BridgeFileDoesNotExist)
}
println!("Bridge doesn't exist. Operation finished!");
Err(DisconnectBridgeError::BridgeFileDoesNotExist)
}

Err(e) => Err(DisconnectBridgeError::FileOperationFailed(
Expand Down
30 changes: 25 additions & 5 deletions crates/core/tedge/src/cli/refresh_bridges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tedge_config::TEdgeConfigLocation;
use super::common::Cloud;
use super::connect::ConnectError;
use crate::bridge::BridgeConfig;
use crate::bridge::BridgeLocation;
use crate::bridge::CommonMosquittoConfig;
use crate::bridge::TEDGE_BRIDGE_CONF_DIR_PATH;
use crate::command::BuildContext;
Expand All @@ -28,19 +29,38 @@ impl Command for RefreshBridgesCmd {
fn execute(&self) -> anyhow::Result<()> {
let clouds = established_bridges(&self.config_location);

if clouds.is_empty() {
if clouds.is_empty() && !self.config.mqtt.bridge.built_in {
println!("No bridges to refresh.");
return Ok(());
}

let common_mosquitto_config = CommonMosquittoConfig::from_tedge_config(&self.config);
common_mosquitto_config.save(&self.config_location)?;

for cloud in clouds {
println!("Refreshing bridge {cloud}");
if !self.config.mqtt.bridge.built_in {
for cloud in &clouds {
println!("Refreshing bridge {cloud}");

let bridge_config = super::connect::bridge_config(&self.config, cloud)?;
refresh_bridge(&bridge_config, &self.config_location)?;
let bridge_config = super::connect::bridge_config(&self.config, *cloud)?;
refresh_bridge(&bridge_config, &self.config_location)?;
}
}

for cloud in [Cloud::Aws, Cloud::Azure, Cloud::C8y] {
// (attempt to) reassert ownership of the certificate and key
// This is necessary when upgrading from the mosquitto bridge to the built-in bridge
if let Ok(bridge_config) = super::connect::bridge_config(&self.config, cloud) {
super::connect::chown_certificate_and_key(&bridge_config);

if bridge_config.bridge_location == BridgeLocation::BuiltIn
&& clouds.contains(&cloud)
{
println!(
"Deleting mosquitto bridge configuration for {cloud} in favour of built-in bridge"
);
super::connect::use_built_in_bridge(&self.config_location, &bridge_config)?;
}
}
}

println!("Restarting mosquitto service.\n");
Expand Down
103 changes: 94 additions & 9 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use tedge_config::TEdgeConfig;
use tedge_downloader_ext::DownloaderActor;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_http_ext::HttpActor;
use tedge_mqtt_bridge::rumqttc::LastWill;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tedge_mqtt_bridge::QoS;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_timer_ext::TimerActor;
use tedge_uploader_ext::UploaderActor;
Expand Down Expand Up @@ -72,7 +74,36 @@ impl TEdgeComponent for CumulocityMapper {
tc.forward_from_remote(topic, local_prefix.clone(), "")?;
}

tc.forward_from_local("#", local_prefix, "")?;
// Templates
tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?;

// Static templates
tc.forward_from_local("s/us", local_prefix.clone(), "")?;
tc.forward_from_local("s/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("t/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("q/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("c/us/#", local_prefix.clone(), "")?;

// SmartREST2
tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?;

// c8y JSON
tc.forward_from_local(
"inventory/managedObjects/update/#",
local_prefix.clone(),
"",
)?;
tc.forward_from_local(
"measurement/measurements/create/#",
local_prefix.clone(),
"",
)?;
tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?;
tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?;
tc.forward_from_local("s/uat", local_prefix.clone(), "")?;

let c8y = tedge_config.c8y.mqtt.or_config_not_set()?;
let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
Expand All @@ -89,14 +120,68 @@ impl TEdgeComponent for CumulocityMapper {
&tedge_config,
)?;

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
let main_device_xid: EntityExternalId =
tedge_config.device.id.try_read(&tedge_config)?.into();
let service_type = &tedge_config.service.ty;
let service_type = if service_type.is_empty() {
"service".to_string()
} else {
service_type.to_string()
};

// FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme

// there is one mapper instance per cloud per thin-edge instance, perhaps we should use some
// predefined topic id instead of trying to derive it from current device?
let entity_topic_id: EntityTopicId = tedge_config
.mqtt
.device_topic_id
.clone()
.parse()
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id(
&mapper_service_topic_id,
&main_device_xid,
);

let last_will_message_mapper =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
service_type.as_str(),
"down",
)?;
let last_will_message_bridge =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
&c8y_mapper_config.bridge_service_name(),
service_type.as_str(),
"down",
)?;

cloud_config.set_last_will(LastWill {
topic: "s/us".into(),
qos: QoS::AtLeastOnce,
message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(),
retain: false,
});

runtime
.spawn(
MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await,
)
.await?;
}
let mut jwt_actor = C8YJwtRetriever::builder(
mqtt_config.clone(),
Expand Down
Loading

0 comments on commit ba2b00a

Please sign in to comment.