From 2f55fbcd797fc5c71e8f53811ccbc400a571b9ae Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Fri, 12 Jul 2024 14:02:55 +0200 Subject: [PATCH] Remove `Send` requirement from visitor function --- Cargo.lock | 1 + crates/ruff_db/Cargo.toml | 1 + crates/ruff_db/src/system/memory_fs.rs | 15 +- crates/ruff_db/src/system/os.rs | 163 +++++++++++++------- crates/ruff_db/src/system/walk_directory.rs | 46 +----- 5 files changed, 118 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5e8f9341200de..f853c5eb3b13a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2097,6 +2097,7 @@ dependencies = [ "filetime", "ignore", "insta", + "rayon", "ruff_python_ast", "ruff_python_parser", "ruff_source_file", diff --git a/crates/ruff_db/Cargo.toml b/crates/ruff_db/Cargo.toml index bbaf27ace212b6..64f35449f78e2c 100644 --- a/crates/ruff_db/Cargo.toml +++ b/crates/ruff_db/Cargo.toml @@ -21,6 +21,7 @@ countme = { workspace = true } dashmap = { workspace = true } filetime = { workspace = true } ignore = { workspace = true, optional = true } +rayon = { workspace = true, optional = true } salsa = { workspace = true } tracing = { workspace = true } rustc-hash = { workspace = true } diff --git a/crates/ruff_db/src/system/memory_fs.rs b/crates/ruff_db/src/system/memory_fs.rs index a6bd2d79e90be6..5dac5b2bcc95ed 100644 --- a/crates/ruff_db/src/system/memory_fs.rs +++ b/crates/ruff_db/src/system/memory_fs.rs @@ -10,7 +10,7 @@ use crate::system::{ }; use super::walk_directory::{ - DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration, WalkDirectoryVisitorBuilder, + DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration, WalkDirectoryVisitor, }; /// File system that stores all content in memory. @@ -414,18 +414,13 @@ struct MemoryWalker { } impl DirectoryWalker for MemoryWalker { - fn walk( - &self, - builder: &mut dyn WalkDirectoryVisitorBuilder, - configuration: WalkDirectoryConfiguration, - ) { + fn walk(&self, configuration: WalkDirectoryConfiguration, visitor: &mut WalkDirectoryVisitor) { let WalkDirectoryConfiguration { paths, hidden, standard_filters: _, } = configuration; - let mut visitor = builder.build(); let mut queue: Vec<_> = paths.into_iter().map(|path| (path, 0)).collect(); while let Some((path, depth)) = queue.pop() { @@ -434,7 +429,7 @@ impl DirectoryWalker for MemoryWalker { let entries = match self.fs.read_directory(&path) { Ok(entries) => entries, Err(error) => { - visitor.visit(Err(walk_directory::Error { + visitor(Err(walk_directory::Error { depth: Some(depth), kind: walk_directory::ErrorKind::Io { path: Some(path), @@ -457,7 +452,7 @@ impl DirectoryWalker for MemoryWalker { continue; } - let state = visitor.visit(Ok(walk_directory::DirectoryEntry { + let state = visitor(Ok(walk_directory::DirectoryEntry { path: entry.path.clone(), file_type: entry.file_type, depth, @@ -474,7 +469,7 @@ impl DirectoryWalker for MemoryWalker { } } Err(error) => { - visitor.visit(Err(walk_directory::Error { + visitor(Err(walk_directory::Error { depth: Some(depth), kind: walk_directory::ErrorKind::Io { path: None, diff --git a/crates/ruff_db/src/system/os.rs b/crates/ruff_db/src/system/os.rs index 3c3ca5a43a53d3..a603209b60958b 100644 --- a/crates/ruff_db/src/system/os.rs +++ b/crates/ruff_db/src/system/os.rs @@ -1,13 +1,14 @@ use crate::system::{ DirectoryEntry, FileType, Metadata, Result, System, SystemPath, SystemPathBuf, }; +use camino::Utf8Path; use filetime::FileTime; use std::sync::Arc; use std::{any::Any, path::PathBuf}; use super::walk_directory::{ self, DirectoryEntry, DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration, - WalkDirectoryVisitorBuilder, WalkState, + WalkDirectoryVisitor, WalkState, }; /// A system implementation that uses the OS file system. @@ -98,11 +99,7 @@ impl System for OsSystem { struct OsDirectoryWalker; impl DirectoryWalker for OsDirectoryWalker { - fn walk( - &self, - visitor_builder: &mut dyn WalkDirectoryVisitorBuilder, - configuration: WalkDirectoryConfiguration, - ) { + fn walk(&self, configuration: WalkDirectoryConfiguration, visitor: &mut WalkDirectoryVisitor) { let WalkDirectoryConfiguration { paths, hidden, @@ -122,61 +119,113 @@ impl DirectoryWalker for OsDirectoryWalker { builder.add(additional_path.as_std_path()); } - builder.threads( - std::thread::available_parallelism() - .map_or(1, std::num::NonZeroUsize::get) - .min(12), - ); - - builder.build_parallel().run(|| { - let mut visitor = visitor_builder.build(); - - Box::new(move |entry| { - match entry { - Ok(entry) => { - // SAFETY: WalkDirectoryVisitor doesn't support walking stdin. - let file_type = entry.file_type().unwrap(); - let depth = entry.depth(); - - // If there's an error related to a git ignore file, warn. - // Errors related to traversing the file system are reported alongside the entry. - if let Some(error) = entry.error() { - tracing::warn!("{error}"); - } + let threads = std::thread::available_parallelism() + .map_or(1, std::num::NonZeroUsize::get) + .min(6); + + // The visitor threads use this channel to send directory entries back to the main thread, + // and the main thread calls the visitor on them. + let (main_sender, main_receiver) = std::sync::mpsc::sync_channel::<( + usize, + std::result::Result, + )>(threads); + + // Create channels that allow sending messages from the main-thread back to a visitor thread. + // This is used to send back the `WalkState` for a directory entry. + let mut visitor_senders = Vec::with_capacity(threads); + let mut visitor_receivers = Vec::with_capacity(threads); + + for _ in 0..threads { + let (visitor_sender, visitor_receiver) = std::sync::mpsc::sync_channel::(1); + visitor_senders.push(visitor_sender); + visitor_receivers.push(visitor_receiver); + } - match SystemPathBuf::from_path_buf(entry.into_path()) { - Ok(path) => { - let directory_entry = DirectoryEntry { - path, - file_type: FileType::from(file_type), - depth, - }; + std::thread::scope(|scope| { + scope.spawn(move || { + let mut next_visitor_id = 0usize; + let mut main_sender = Some(main_sender); - visitor.visit(Ok(directory_entry)).into() - } - Err(path) => { - visitor.visit(Err(walk_directory::Error { - depth: Some(depth), - kind: walk_directory::ErrorKind::NonUtf8Path { path }, - })); - - // Skip the entire directory because all the paths won't be UTF-8 paths. - ignore::WalkState::Skip - } - } + builder.build_parallel().run(|| { + let visitor_id = next_visitor_id; + let receiver = visitor_receivers + .pop() + .expect("Builder to not create more threads than "); + + next_visitor_id += 1; + + let main = main_sender + .as_ref() + .expect("Main sender to not be null.") + .clone(); + + if visitor_receivers.len() > 0 { + // Drop the main sender + main_sender = None; } - Err(error) => match ignore_to_walk_directory_error(error, None, None) { - Ok(error) => visitor.visit(Err(error)).into(), - Err(error) => { - // This should only be reached when the error is an ignore related error - // (which, should not be reported here but the `ignore` crate doesn't distinguish between ignore and IO errors). - // Let's log the error to at least make it visible. - tracing::warn!("Failed to traverse directory: {error}."); - ignore::WalkState::Continue + + Box::new(move |entry| { + let visit = |entry| { + main.send((visitor_id, entry)).unwrap(); + receiver.recv().unwrap().into() + }; + + match entry { + Ok(entry) => { + // SAFETY: WalkDirectoryVisitor doesn't support walking stdin. + let file_type = entry.file_type().unwrap(); + let depth = entry.depth(); + + // If there's an error related to a git ignore file, warn. + // Errors related to traversing the file system are reported alongside the entry. + if let Some(error) = entry.error() { + tracing::warn!("{error}"); + } + + match SystemPathBuf::from_path_buf(entry.into_path()) { + Ok(path) => { + let directory_entry = DirectoryEntry { + path, + file_type: FileType::from(file_type), + depth, + }; + + let path: Utf8Path; + + visit(Ok(directory_entry)) + } + Err(path) => { + visit(Err(walk_directory::Error { + depth: Some(depth), + kind: walk_directory::ErrorKind::NonUtf8Path { path }, + })); + + // Skip the entire directory because all the paths won't be UTF-8 paths. + ignore::WalkState::Skip + } + } + } + Err(error) => match ignore_to_walk_directory_error(error, None, None) { + Ok(error) => visit(Err(error)), + Err(error) => { + // This should only be reached when the error is an ignore related error + // (which, should not be reported here but the `ignore` crate doesn't distinguish between ignore and IO errors). + // Let's log the error to at least make it visible. + tracing::warn!("Failed to traverse directory: {error}."); + ignore::WalkState::Continue + } + }, } - }, - } - }) + }) + }); + }); + + for (visitor_id, entry) in main_receiver { + let visitor_sender = &visitor_senders[visitor_id]; + + let state = visitor(entry); + visitor_sender.send(state).unwrap(); + } }); } } diff --git a/crates/ruff_db/src/system/walk_directory.rs b/crates/ruff_db/src/system/walk_directory.rs index 48645accc257cf..ecf8e56e7cac04 100644 --- a/crates/ruff_db/src/system/walk_directory.rs +++ b/crates/ruff_db/src/system/walk_directory.rs @@ -60,9 +60,9 @@ impl WalkDirectoryBuilder { /// Runs the directory traversal and calls `builder` to create a visitor for each thread /// that does the visiting. It's up to the actual walker to decide how many threads to use. - pub fn run<'s, F>(self, builder: F) + pub fn run(self, visitor: &mut F) where - F: FnMut() -> FnVisitor<'s>, + F: FnMut(std::result::Result) -> WalkState + 'static, { let configuration = WalkDirectoryConfiguration { paths: self.paths, @@ -70,53 +70,17 @@ impl WalkDirectoryBuilder { standard_filters: self.standard_filters, }; - self.walker.walk(&mut FnBuilder { builder }, configuration); + self.walker.walk(configuration, visitor); } } /// Concrete walker that performs the directory walking. pub trait DirectoryWalker { - fn walk( - &self, - builder: &mut dyn WalkDirectoryVisitorBuilder, - configuration: WalkDirectoryConfiguration, - ); -} - -/// Creates a visitor for each thread that does the visiting. -pub trait WalkDirectoryVisitorBuilder<'s> { - fn build(&mut self) -> Box; + fn walk(&self, configuration: WalkDirectoryConfiguration, visistor: &mut WalkDirectoryVisitor); } /// Visitor handling the individual directory entries. -pub trait WalkDirectoryVisitor: Send { - fn visit(&mut self, entry: std::result::Result) -> WalkState; -} - -struct FnBuilder { - builder: F, -} - -impl<'s, F> WalkDirectoryVisitorBuilder<'s> for FnBuilder -where - F: FnMut() -> FnVisitor<'s>, -{ - fn build(&mut self) -> Box { - let visitor = (self.builder)(); - Box::new(FnVisitorImpl(visitor)) - } -} - -type FnVisitor<'s> = - Box) -> WalkState + Send + 's>; - -struct FnVisitorImpl<'s>(FnVisitor<'s>); - -impl WalkDirectoryVisitor for FnVisitorImpl<'_> { - fn visit(&mut self, entry: std::result::Result) -> WalkState { - (self.0)(entry) - } -} +pub type WalkDirectoryVisitor = dyn FnMut(std::result::Result) -> WalkState; pub struct WalkDirectoryConfiguration { pub paths: Vec,