Skip to content

Commit

Permalink
Remove Send requirement from visitor function
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaReiser committed Jul 12, 2024
1 parent 96392a5 commit 2f55fbc
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 108 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ruff_db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ countme = { workspace = true }
dashmap = { workspace = true }
filetime = { workspace = true }
ignore = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
salsa = { workspace = true }
tracing = { workspace = true }
rustc-hash = { workspace = true }
Expand Down
15 changes: 5 additions & 10 deletions crates/ruff_db/src/system/memory_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::system::{
};

use super::walk_directory::{
DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration, WalkDirectoryVisitorBuilder,
DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration, WalkDirectoryVisitor,
};

/// File system that stores all content in memory.
Expand Down Expand Up @@ -414,18 +414,13 @@ struct MemoryWalker {
}

impl DirectoryWalker for MemoryWalker {
fn walk(
&self,
builder: &mut dyn WalkDirectoryVisitorBuilder,
configuration: WalkDirectoryConfiguration,
) {
fn walk(&self, configuration: WalkDirectoryConfiguration, visitor: &mut WalkDirectoryVisitor) {
let WalkDirectoryConfiguration {
paths,
hidden,
standard_filters: _,
} = configuration;

let mut visitor = builder.build();
let mut queue: Vec<_> = paths.into_iter().map(|path| (path, 0)).collect();

while let Some((path, depth)) = queue.pop() {
Expand All @@ -434,7 +429,7 @@ impl DirectoryWalker for MemoryWalker {
let entries = match self.fs.read_directory(&path) {
Ok(entries) => entries,
Err(error) => {
visitor.visit(Err(walk_directory::Error {
visitor(Err(walk_directory::Error {
depth: Some(depth),
kind: walk_directory::ErrorKind::Io {
path: Some(path),
Expand All @@ -457,7 +452,7 @@ impl DirectoryWalker for MemoryWalker {
continue;
}

let state = visitor.visit(Ok(walk_directory::DirectoryEntry {
let state = visitor(Ok(walk_directory::DirectoryEntry {
path: entry.path.clone(),
file_type: entry.file_type,
depth,
Expand All @@ -474,7 +469,7 @@ impl DirectoryWalker for MemoryWalker {
}
}
Err(error) => {
visitor.visit(Err(walk_directory::Error {
visitor(Err(walk_directory::Error {
depth: Some(depth),
kind: walk_directory::ErrorKind::Io {
path: None,
Expand Down
163 changes: 106 additions & 57 deletions crates/ruff_db/src/system/os.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::system::{
DirectoryEntry, FileType, Metadata, Result, System, SystemPath, SystemPathBuf,
};
use camino::Utf8Path;
use filetime::FileTime;
use std::sync::Arc;
use std::{any::Any, path::PathBuf};

use super::walk_directory::{
self, DirectoryEntry, DirectoryWalker, WalkDirectoryBuilder, WalkDirectoryConfiguration,
WalkDirectoryVisitorBuilder, WalkState,
WalkDirectoryVisitor, WalkState,
};

/// A system implementation that uses the OS file system.
Expand Down Expand Up @@ -98,11 +99,7 @@ impl System for OsSystem {
struct OsDirectoryWalker;

impl DirectoryWalker for OsDirectoryWalker {
fn walk(
&self,
visitor_builder: &mut dyn WalkDirectoryVisitorBuilder,
configuration: WalkDirectoryConfiguration,
) {
fn walk(&self, configuration: WalkDirectoryConfiguration, visitor: &mut WalkDirectoryVisitor) {
let WalkDirectoryConfiguration {
paths,
hidden,
Expand All @@ -122,61 +119,113 @@ impl DirectoryWalker for OsDirectoryWalker {
builder.add(additional_path.as_std_path());
}

builder.threads(
std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.min(12),
);

builder.build_parallel().run(|| {
let mut visitor = visitor_builder.build();

Box::new(move |entry| {
match entry {
Ok(entry) => {
// SAFETY: WalkDirectoryVisitor doesn't support walking stdin.
let file_type = entry.file_type().unwrap();
let depth = entry.depth();

// If there's an error related to a git ignore file, warn.
// Errors related to traversing the file system are reported alongside the entry.
if let Some(error) = entry.error() {
tracing::warn!("{error}");
}
let threads = std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.min(6);

// The visitor threads use this channel to send directory entries back to the main thread,
// and the main thread calls the visitor on them.
let (main_sender, main_receiver) = std::sync::mpsc::sync_channel::<(
usize,
std::result::Result<DirectoryEntry, walk_directory::Error>,
)>(threads);

// Create channels that allow sending messages from the main-thread back to a visitor thread.
// This is used to send back the `WalkState` for a directory entry.
let mut visitor_senders = Vec::with_capacity(threads);
let mut visitor_receivers = Vec::with_capacity(threads);

for _ in 0..threads {
let (visitor_sender, visitor_receiver) = std::sync::mpsc::sync_channel::<WalkState>(1);
visitor_senders.push(visitor_sender);
visitor_receivers.push(visitor_receiver);
}

match SystemPathBuf::from_path_buf(entry.into_path()) {
Ok(path) => {
let directory_entry = DirectoryEntry {
path,
file_type: FileType::from(file_type),
depth,
};
std::thread::scope(|scope| {
scope.spawn(move || {
let mut next_visitor_id = 0usize;
let mut main_sender = Some(main_sender);

visitor.visit(Ok(directory_entry)).into()
}
Err(path) => {
visitor.visit(Err(walk_directory::Error {
depth: Some(depth),
kind: walk_directory::ErrorKind::NonUtf8Path { path },
}));

// Skip the entire directory because all the paths won't be UTF-8 paths.
ignore::WalkState::Skip
}
}
builder.build_parallel().run(|| {
let visitor_id = next_visitor_id;
let receiver = visitor_receivers
.pop()
.expect("Builder to not create more threads than <num_threads>");

next_visitor_id += 1;

let main = main_sender
.as_ref()
.expect("Main sender to not be null.")
.clone();

if visitor_receivers.len() > 0 {
// Drop the main sender
main_sender = None;
}
Err(error) => match ignore_to_walk_directory_error(error, None, None) {
Ok(error) => visitor.visit(Err(error)).into(),
Err(error) => {
// This should only be reached when the error is an ignore related error
// (which, should not be reported here but the `ignore` crate doesn't distinguish between ignore and IO errors).
// Let's log the error to at least make it visible.
tracing::warn!("Failed to traverse directory: {error}.");
ignore::WalkState::Continue

Box::new(move |entry| {
let visit = |entry| {
main.send((visitor_id, entry)).unwrap();
receiver.recv().unwrap().into()
};

match entry {
Ok(entry) => {
// SAFETY: WalkDirectoryVisitor doesn't support walking stdin.
let file_type = entry.file_type().unwrap();
let depth = entry.depth();

// If there's an error related to a git ignore file, warn.
// Errors related to traversing the file system are reported alongside the entry.
if let Some(error) = entry.error() {
tracing::warn!("{error}");
}

match SystemPathBuf::from_path_buf(entry.into_path()) {
Ok(path) => {
let directory_entry = DirectoryEntry {
path,
file_type: FileType::from(file_type),
depth,
};

let path: Utf8Path;

visit(Ok(directory_entry))
}
Err(path) => {
visit(Err(walk_directory::Error {
depth: Some(depth),
kind: walk_directory::ErrorKind::NonUtf8Path { path },
}));

// Skip the entire directory because all the paths won't be UTF-8 paths.
ignore::WalkState::Skip
}
}
}
Err(error) => match ignore_to_walk_directory_error(error, None, None) {
Ok(error) => visit(Err(error)),
Err(error) => {
// This should only be reached when the error is an ignore related error
// (which, should not be reported here but the `ignore` crate doesn't distinguish between ignore and IO errors).
// Let's log the error to at least make it visible.
tracing::warn!("Failed to traverse directory: {error}.");
ignore::WalkState::Continue
}
},
}
},
}
})
})
});
});

for (visitor_id, entry) in main_receiver {
let visitor_sender = &visitor_senders[visitor_id];

let state = visitor(entry);
visitor_sender.send(state).unwrap();
}
});
}
}
Expand Down
46 changes: 5 additions & 41 deletions crates/ruff_db/src/system/walk_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,63 +60,27 @@ impl WalkDirectoryBuilder {

/// Runs the directory traversal and calls `builder` to create a visitor for each thread
/// that does the visiting. It's up to the actual walker to decide how many threads to use.
pub fn run<'s, F>(self, builder: F)
pub fn run<F>(self, visitor: &mut F)
where
F: FnMut() -> FnVisitor<'s>,
F: FnMut(std::result::Result<DirectoryEntry, Error>) -> WalkState + 'static,
{
let configuration = WalkDirectoryConfiguration {
paths: self.paths,
hidden: self.ignore_hidden,
standard_filters: self.standard_filters,
};

self.walker.walk(&mut FnBuilder { builder }, configuration);
self.walker.walk(configuration, visitor);
}
}

/// Concrete walker that performs the directory walking.
pub trait DirectoryWalker {
fn walk(
&self,
builder: &mut dyn WalkDirectoryVisitorBuilder,
configuration: WalkDirectoryConfiguration,
);
}

/// Creates a visitor for each thread that does the visiting.
pub trait WalkDirectoryVisitorBuilder<'s> {
fn build(&mut self) -> Box<dyn WalkDirectoryVisitor + 's>;
fn walk(&self, configuration: WalkDirectoryConfiguration, visistor: &mut WalkDirectoryVisitor);
}

/// Visitor handling the individual directory entries.
pub trait WalkDirectoryVisitor: Send {
fn visit(&mut self, entry: std::result::Result<DirectoryEntry, Error>) -> WalkState;
}

struct FnBuilder<F> {
builder: F,
}

impl<'s, F> WalkDirectoryVisitorBuilder<'s> for FnBuilder<F>
where
F: FnMut() -> FnVisitor<'s>,
{
fn build(&mut self) -> Box<dyn WalkDirectoryVisitor + 's> {
let visitor = (self.builder)();
Box::new(FnVisitorImpl(visitor))
}
}

type FnVisitor<'s> =
Box<dyn FnMut(std::result::Result<DirectoryEntry, Error>) -> WalkState + Send + 's>;

struct FnVisitorImpl<'s>(FnVisitor<'s>);

impl WalkDirectoryVisitor for FnVisitorImpl<'_> {
fn visit(&mut self, entry: std::result::Result<DirectoryEntry, Error>) -> WalkState {
(self.0)(entry)
}
}
pub type WalkDirectoryVisitor = dyn FnMut(std::result::Result<DirectoryEntry, Error>) -> WalkState;

pub struct WalkDirectoryConfiguration {
pub paths: Vec<SystemPathBuf>,
Expand Down

0 comments on commit 2f55fbc

Please sign in to comment.