Update to reuse searchers and support binary detection

pull/136/head
Chip Senkbeil 2 years ago
parent f67d14de7a
commit 86ed409765
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

1
Cargo.lock generated

@ -767,6 +767,7 @@ dependencies = [
"indoc",
"log",
"notify",
"num_cpus",
"once_cell",
"portable-pty",
"predicates",

@ -26,6 +26,7 @@ hex = "0.4.3"
ignore = "0.4.18"
log = "0.4.17"
notify = { version = "=5.0.0-pre.15", features = ["serde"] }
num_cpus = "1.13.1"
once_cell = "1.13.0"
portable-pty = "0.7.0"
rand = { version = "0.8.5", features = ["getrandom"] }

@ -6,17 +6,22 @@ use crate::data::{
use distant_net::Reply;
use grep::{
matcher::Matcher,
regex::RegexMatcher,
searcher::{Searcher, Sink, SinkMatch},
regex::{RegexMatcher, RegexMatcherBuilder},
searcher::{BinaryDetection, Searcher, SearcherBuilder, Sink, SinkMatch},
};
use ignore::{
types::TypesBuilder, DirEntry, ParallelVisitor, ParallelVisitorBuilder, WalkBuilder,
WalkParallel,
};
use ignore::{DirEntry, ParallelVisitor, ParallelVisitorBuilder, WalkBuilder, WalkParallel};
use log::*;
use std::{collections::HashMap, io, ops::Deref, path::Path};
use std::{cmp, collections::HashMap, io, ops::Deref, path::Path};
use tokio::{
sync::{broadcast, mpsc, oneshot},
task::JoinHandle,
};
const MAXIMUM_SEARCH_THREADS: usize = 12;
/// Holds information related to active searches on the server
pub struct SearchState {
channel: SearchChannel,
@ -273,7 +278,19 @@ impl SearchQueryExecutor {
let (match_tx, match_rx) = mpsc::unbounded_channel();
let regex = query.condition.to_regex_string();
let matcher = RegexMatcher::new(&regex)
let mut matcher_builder = RegexMatcherBuilder::new();
matcher_builder
.case_insensitive(false)
.case_smart(false)
.multi_line(true)
.dot_matches_new_line(false)
.swap_greed(false)
.ignore_whitespace(false)
.unicode(true)
.octal(false)
.line_terminator(Some(b'\n'));
let matcher = matcher_builder
.build(&regex)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?;
if query.paths.is_empty() {
@ -285,11 +302,8 @@ impl SearchQueryExecutor {
walker_builder.add(path);
}
// TODO: Use something like num_cpus to determine thread count
walker_builder
.skip_stdout(true)
.follow_links(query.options.follow_symbolic_links)
.threads(8)
.max_depth(
query
.options
@ -297,7 +311,15 @@ impl SearchQueryExecutor {
.as_ref()
.copied()
.map(|d| d as usize),
);
)
.threads(cmp::min(MAXIMUM_SEARCH_THREADS, num_cpus::get()))
.types(
TypesBuilder::new()
.add_defaults()
.build()
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?,
)
.skip_stdout(true);
Ok(Self {
id: rand::random(),
@ -414,12 +436,35 @@ struct SearchQueryExecutorParallelVistorBuilder<'a> {
impl<'a> ParallelVisitorBuilder<'a> for SearchQueryExecutorParallelVistorBuilder<'a> {
fn build(&mut self) -> Box<dyn ParallelVisitor + 'a> {
// For files that are searched as part of a recursive search
//
// Details:
// * Will quit early if detecting binary file due to null byte
//
// NOTE: Searchers are not Send/Sync so we must create them here
let implicit_searcher = SearcherBuilder::new()
.binary_detection(BinaryDetection::quit(0))
.build();
// For files that are searched because they are provided as one of our initial paths
// (so explicitly by the user)
//
// Details:
// * Will convert binary data with null bytes into newlines
//
// NOTE: Searchers are not Send/Sync so we must create them here
let explicit_searcher = SearcherBuilder::new()
.binary_detection(BinaryDetection::convert(0))
.build();
Box::new(SearchQueryExecutorParallelVistor {
search_id: self.search_id,
target: self.target,
cancel: self.cancel.resubscribe(),
tx: self.tx.clone(),
matcher: self.matcher,
implicit_searcher,
explicit_searcher,
include_path_filter: self.include_path_filter,
exclude_path_filter: self.exclude_path_filter,
options_filter: self.options_filter,
@ -433,6 +478,8 @@ struct SearchQueryExecutorParallelVistor<'a> {
cancel: broadcast::Receiver<()>,
tx: mpsc::UnboundedSender<SearchQueryMatch>,
matcher: &'a RegexMatcher,
implicit_searcher: Searcher,
explicit_searcher: Searcher,
include_path_filter: &'a SearchQueryPathFilter,
exclude_path_filter: &'a SearchQueryPathFilter,
options_filter: &'a SearchQueryOptionsFilter,
@ -470,11 +517,18 @@ impl<'a> ParallelVisitor for SearchQueryExecutorParallelVistor<'a> {
}
}
// Pick searcher based on whether this was an explicit or recursive path
let searcher = if entry.depth() == 0 {
&mut self.explicit_searcher
} else {
&mut self.implicit_searcher
};
let res = match self.target {
// Perform the search against the path itself
SearchQueryTarget::Path => {
let path_str = entry.path().to_string_lossy();
Searcher::new().search_slice(
searcher.search_slice(
self.matcher,
path_str.as_bytes(),
SearchQueryPathSink {
@ -487,7 +541,7 @@ impl<'a> ParallelVisitor for SearchQueryExecutorParallelVistor<'a> {
}
// Perform the search against the file's contents
SearchQueryTarget::Contents => Searcher::new().search_path(
SearchQueryTarget::Contents => searcher.search_path(
self.matcher,
entry.path(),
SearchQueryContentsSink {

@ -121,7 +121,7 @@ where
match result {
Ok(x) => x,
Err(x) => {
error!("Server no longer accepting connections: {}", x);
error!("Server no longer accepting connections: {x}");
if let Some(timer) = shutdown_timer.take() {
timer.lock().await.abort();
}
@ -160,9 +160,8 @@ where
let (tx, mut rx) = mpsc::channel::<Response<Res>>(1);
connection.writer_task = Some(tokio::spawn(async move {
while let Some(data) = rx.recv().await {
// trace!("[Conn {}] Sending {:?}", connection_id, data.payload);
if let Err(x) = writer.write(data).await {
error!("[Conn {}] Failed to send {:?}", connection_id, x);
error!("[Conn {connection_id}] Failed to send {x}");
break;
}
}
@ -194,7 +193,7 @@ where
server.on_request(ctx).await;
}
Ok(None) => {
debug!("[Conn {}] Connection closed", connection_id);
debug!("[Conn {connection_id}] Connection closed");
// Remove the connection from our state if it has closed
if let Some(state) = Weak::upgrade(&weak_state) {
@ -214,7 +213,7 @@ where
// if someone sends bad data at any point, but does not
// mean that the reader itself has failed. This can
// happen from getting non-compliant typed data
error!("[Conn {}] {}", connection_id, x);
error!("[Conn {connection_id}] {x}");
}
}
}

Loading…
Cancel
Save