Skip to content

Commit

Permalink
✨ feat: implement async multithreading and engine selection code
Browse files Browse the repository at this point in the history
  • Loading branch information
neon-mmd committed Jul 11, 2023
1 parent 897ab08 commit 0781385
Showing 1 changed file with 85 additions and 34 deletions.
119 changes: 85 additions & 34 deletions src/search_results_handler/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@

use std::{collections::HashMap, time::Duration};

use error_stack::Report;
use rand::Rng;
use tokio::join;
use tokio::task::JoinHandle;

use super::{
aggregation_models::{RawSearchResult, SearchResult, SearchResults},
user_agent::random_user_agent,
};

use crate::engines::{duckduckgo, searx};
use crate::engines::{
duckduckgo,
engine_models::{EngineError, SearchEngine},
searx,
};

/// A function that aggregates all the scraped results from the above upstream engines and
/// then removes duplicate results and if two results are found to be from two or more engines
Expand All @@ -37,10 +42,11 @@ use crate::engines::{duckduckgo, searx};
/// function in either `searx` or `duckduckgo` or both otherwise returns a `SearchResults struct`
/// containing appropriate values.
pub async fn aggregate(
query: &str,
query: String,
page: u32,
random_delay: bool,
debug: bool,
upstream_search_engines: Vec<String>,
) -> Result<SearchResults, Box<dyn std::error::Error>> {
let user_agent: String = random_user_agent();
let mut result_map: HashMap<String, RawSearchResult> = HashMap::new();
Expand All @@ -53,41 +59,86 @@ pub async fn aggregate(
}

// fetch results from upstream search engines simultaneously/concurrently.
let (ddg_map_results, searx_map_results) = join!(
duckduckgo::results(query, page, &user_agent),
searx::results(query, page, &user_agent)
);
let search_engines: Vec<Box<dyn SearchEngine>> = upstream_search_engines
.iter()
.map(|engine| match engine.to_lowercase().as_str() {

Check failure on line 64 in src/search_results_handler/aggregator.rs

View workflow job for this annotation

GitHub Actions / Rust project

non-exhaustive patterns: `&_` not covered
"duckduckgo" => Box::new(duckduckgo::DuckDuckGo) as Box<dyn SearchEngine>,
"searx " => Box::new(searx::Searx) as Box<dyn SearchEngine>,
})
.collect();

let ddg_map_results = ddg_map_results.unwrap_or_else(|e| {
if debug {
log::error!("Error fetching results from DuckDuckGo: {:?}", e);
}
HashMap::new()
});
let tasks: Vec<JoinHandle<Result<HashMap<String, RawSearchResult>, Report<EngineError>>>> =
search_engines

Check failure on line 71 in src/search_results_handler/aggregator.rs

View workflow job for this annotation

GitHub Actions / Rust project

`search_engines` does not live long enough
.iter()
.map(|search_engine| {
tokio::spawn(search_engine.results(query.clone(), page, user_agent.clone()))
})
.collect();

let searx_map_results = searx_map_results.unwrap_or_else(|e| {
if debug {
log::error!("Error fetching results from Searx: {:?}", e);
}
HashMap::new()
});
let mut outputs = Vec::with_capacity(search_engines.len());

result_map.extend(ddg_map_results);
for task in tasks {
outputs.push(task.await.ok())
}

searx_map_results.into_iter().for_each(|(key, value)| {
result_map
.entry(key)
.and_modify(|result| {
result.add_engines(value.clone().engine());
})
.or_insert_with(|| -> RawSearchResult {
RawSearchResult::new(
value.title.clone(),
value.visiting_url.clone(),
value.description.clone(),
value.engine.clone(),
)
});
let mut initial: bool = true;
let mut counter: usize = 0;
outputs.iter().for_each(|results| {
if initial {
match results {
Some(result) => {
let new_result = result.clone();
result_map.extend(new_result.as_ref().unwrap().clone());
counter += 1;
initial = false
}
None => {
if debug {
log::error!(
"Error fetching results from {}",
upstream_search_engines[counter]
);
};
counter += 1
}
}
} else {
match results {
Some(result) => {
let new_result = result.clone();
new_result
.as_ref()
.unwrap()
.clone()
.into_iter()
.for_each(|(key, value)| {
result_map
.entry(key)
.and_modify(|result| {
result.add_engines(value.clone().engine());
})
.or_insert_with(|| -> RawSearchResult {
RawSearchResult::new(
value.title.clone(),
value.visiting_url.clone(),
value.description.clone(),
value.engine.clone(),
)
});
});
counter += 1
}
None => {
if debug {
log::error!(
"Error fetching results from {}",
upstream_search_engines[counter]
);
};
counter += 1
}
}
}
});

Ok(SearchResults::new(
Expand Down

0 comments on commit 0781385

Please sign in to comment.