pull/5/merge
simonsan 3 months ago committed by GitHub
commit f7b95f8f9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

80
Cargo.lock generated

@ -1,23 +1,91 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "argparse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37bb99f5e39ee8b23b6e227f5b8f024207e8616f44aa4b8c76ecd828011667ef"
[[package]]
name = "byteorder"
version = "0.5.3"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "displaydoc"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "fingertips"
version = "0.1.0"
dependencies = [
"argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"argparse",
"byteorder",
"displaydoc",
"thiserror",
]
[metadata]
"checksum argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37bb99f5e39ee8b23b6e227f5b8f024207e8616f44aa4b8c76ecd828011667ef"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
[[package]]
name = "proc-macro2"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
dependencies = [
"proc-macro2",
]
[[package]]
name = "syn"
version = "2.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"

@ -2,8 +2,25 @@
name = "fingertips"
version = "0.1.0"
authors = ["Jason Orendorff <jason.orendorff@gmail.com>"]
edition = "2018"
edition = "2021"
[dependencies]
argparse = "0.2.1"
byteorder = "0.5.3"
byteorder = "1.5.0"
displaydoc = "0.2.4"
thiserror = "1.0.58"
[lints.rust]
unsafe_code = "forbid"
missing_docs = "warn"
rust_2018_idioms = "warn"
trivial_casts = "warn"
unused_lifetimes = "warn"
unused_qualifications = "warn"
bad_style = "warn"
dead_code = "warn"
[lints.clippy]
all = "warn"
unwrap_used = "warn"
expect_used = "warn"

@ -0,0 +1,45 @@
//! `fingertips` creates an inverted index for a set of text files.
//!
//! Most of the actual work is done by the modules `index`, `read`, `write`,
//! and `merge`. In this file, `main.rs`, we put the pieces together in two
//! different ways.
//!
//! * `run_single_threaded` simply does everything in one thread, in
//! the most straightforward possible way.
//!
//! * Then, we break the work into a five-stage pipeline so that we can run
//! it on multiple CPUs. `run_pipeline` puts the five stages together.
//!
//! This is the `main` function that handles command-line arguments. It calls one
//! of the two functions above to do the work.
use argparse::{ArgumentParser, Collect, StoreTrue};
use fingertips::run;
fn main() {
let mut single_threaded = false;
let mut filenames = vec![];
{
let mut ap = ArgumentParser::new();
ap.set_description("Make an inverted index for searching documents.");
_ = ap.refer(&mut single_threaded).add_option(
&["-1", "--single-threaded"],
StoreTrue,
"Do all the work on a single thread.",
);
_ = ap.refer(&mut filenames).add_argument(
"filenames",
Collect,
"Names of files/directories to index. \
For directories, all .txt files immediately \
under the directory are indexed.",
);
ap.parse_args_or_exit();
}
match run(filenames, single_threaded) {
Ok(()) => {}
Err(err) => println!("error: {err}"),
}
}

@ -0,0 +1,55 @@
use std::error::Error;
/// Result type that is being returned from methods that can fail and thus have [`FingertipsError`]s.
pub type FingertipsResult<T> = Result<T, FingertipsError>;
/// Errors that can result from Fingertips.
// [`Error`] is public, but opaque and easy to keep compatible.
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct FingertipsError(#[from] FingertipsErrorKind);
// Accessors for anything we do want to expose publicly.
impl FingertipsError {
/// Expose the inner error kind.
///
/// This is useful for matching on the error kind.
pub fn into_inner(self) -> FingertipsErrorKind {
self.0
}
}
/// [`FingertipsErrorKind`] describes the errors that can happen while executing a high-level command.
///
/// This is a non-exhaustive enum, so additional variants may be added in future. It is
/// recommended to match against the wildcard `_` instead of listing all possible variants,
/// to avoid problems when new variants are added.
#[non_exhaustive]
#[derive(thiserror::Error, Debug, displaydoc::Display)]
pub enum FingertipsErrorKind {
/// An error occurred while reading from or writing to a file.
#[error(transparent)]
Io(#[from] std::io::Error),
/// An error occurred while parsing a file
TermEmpty,
/// An error occured in the algorithm
AlgorithmError,
/// No entry to move
NoEntryToMove,
/// Computer not big enough to hold index entry, you may be on 32bit platform
PlatformLimitExceeded,
}
trait FingertipsErrorMarker: Error {}
// impl FingertipsErrorMarker for FingertipErrorsInTheCodeBase {}
impl<E> From<E> for FingertipsError
where
E: FingertipsErrorMarker,
FingertipsErrorKind: From<E>,
{
fn from(value: E) -> Self {
Self(FingertipsErrorKind::from(value))
}
}

@ -4,8 +4,10 @@
//! `InMemoryIndex` can be used to do that, up to the size of the machine's
//! memory.
use std::collections::HashMap;
use byteorder::{LittleEndian, WriteBytesExt};
use std::collections::HashMap;
use crate::error::{FingertipsErrorKind, FingertipsResult};
/// Break a string into words.
fn tokenize(text: &str) -> Vec<&str> {
@ -21,6 +23,7 @@ fn tokenize(text: &str) -> Vec<&str> {
/// answer simple search queries. And you can use the `read`, `write`, and
/// `merge` modules to save an in-memory index to disk and merge it with other
/// indices, producing a large index.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct InMemoryIndex {
/// The total number of words in the indexed documents.
pub word_count: usize,
@ -34,7 +37,7 @@ pub struct InMemoryIndex {
/// document id in increasing order. This is handy for some algorithms you
/// might want to run on the index, so we preserve this property wherever
/// possible.
pub map: HashMap<String, Vec<Hit>>
pub map: HashMap<String, Vec<Hit>>,
}
/// A `Hit` indicates that a particular document contains some term, how many
@ -47,40 +50,50 @@ pub type Hit = Vec<u8>;
impl InMemoryIndex {
/// Create a new, empty index.
pub fn new() -> InMemoryIndex {
InMemoryIndex {
word_count: 0,
map: HashMap::new()
}
pub fn new() -> Self {
Self::default()
}
/// Index a single document.
///
/// The resulting index contains exactly one `Hit` per term.
pub fn from_single_document(document_id: usize, text: String) -> InMemoryIndex {
pub fn from_single_document(document_id: usize, text: String) -> FingertipsResult<Self> {
let document_id = document_id as u32;
let mut index = InMemoryIndex::new();
let mut index = Self::new();
let text = text.to_lowercase();
let tokens = tokenize(&text);
let hits_list = {
let mut hits = Vec::with_capacity(4 + 4);
hits.write_u32::<LittleEndian>(document_id)
.map_err(FingertipsErrorKind::Io)?;
vec![hits]
};
for (i, token) in tokens.iter().enumerate() {
let hits =
index.map
.entry(token.to_string())
.or_insert_with(|| {
let mut hits = Vec::with_capacity(4 + 4);
hits.write_u32::<LittleEndian>(document_id).unwrap();
vec![hits]
});
hits[0].write_u32::<LittleEndian>(i as u32).unwrap();
let hits = index
.map
.entry((*token).to_string())
.or_insert(hits_list.clone());
hits[0]
.write_u32::<LittleEndian>(i as u32)
.map_err(FingertipsErrorKind::Io)?;
index.word_count += 1;
}
if document_id % 100 == 0 {
println!("indexed document {}, {} bytes, {} words", document_id, text.len(), index.word_count);
println!(
"indexed document {}, {} bytes, {} words",
document_id,
text.len(),
index.word_count
);
}
index
Ok(index)
}
/// Add all search hits from `other` to this index.
@ -88,11 +101,9 @@ impl InMemoryIndex {
/// If both `*self` and `other` are sorted by document id, and all document
/// ids in `other` are greater than every document id in `*self`, then
/// `*self` remains sorted by document id after merging.
pub fn merge(&mut self, other: InMemoryIndex) {
pub fn merge(&mut self, other: Self) {
for (term, hits) in other.map {
self.map.entry(term)
.or_insert_with(|| vec![])
.extend(hits)
self.map.entry(term).or_default().extend(hits);
}
self.word_count += other.word_count;
}

@ -1,23 +1,24 @@
/// `fingertips` creates an inverted index for a set of text files.
///
/// Most of the actual work is done by the modules `index`, `read`, `write`,
/// and `merge`. In this file, `main.rs`, we put the pieces together in two
/// different ways.
///
/// * `run_single_threaded` simply does everything in one thread, in
/// the most straightforward possible way.
///
/// * Then, we break the work into a five-stage pipeline so that we can run
/// it on multiple CPUs. `run_pipeline` puts the five stages together.
///
/// The `main` function at the end handles command-line arguments. It calls one
/// of the two functions above to do the work.
//! `fingertips` creates an inverted index for a set of text files.
//!
//! Most of the actual work is done by the modules `index`, `read`, `write`,
//! and `merge`. In this file, `main.rs`, we put the pieces together in two
//! different ways.
//!
//! * `run_single_threaded` simply does everything in one thread, in
//! the most straightforward possible way.
//!
//! * Then, we break the work into a five-stage pipeline so that we can run
//! it on multiple CPUs. `run_pipeline` puts the five stages together.
//!
//! The `main` function at the end handles command-line arguments. It calls one
//! of the two functions above to do the work.
mod error;
mod index;
mod read;
mod write;
mod merge;
mod read;
mod tmp;
mod write;
use std::fs::File;
use std::io;
@ -25,18 +26,15 @@ use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver};
use std::thread::{spawn, JoinHandle};
use argparse::{ArgumentParser, StoreTrue, Collect};
use crate::index::InMemoryIndex;
use crate::write::write_index_to_tmp_file;
use crate::merge::FileMerge;
use crate::tmp::TmpDir;
use crate::write::write_index_to_tmp_file;
use crate::{error::FingertipsErrorKind, merge::FileMerge};
use crate::{error::FingertipsResult, index::InMemoryIndex};
/// Create an inverted index for the given list of `documents`,
/// storing it in the specified `output_dir`.
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf) -> FingertipsResult<()> {
// If all the documents fit comfortably in memory, we'll create the whole
// index in memory.
let mut accumulated_index = InMemoryIndex::new();
@ -52,16 +50,19 @@ fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
// For each document in the set...
for (doc_id, filename) in documents.into_iter().enumerate() {
// ...load it into memory...
let mut f = File::open(filename)?;
let mut f = File::open(filename).map_err(FingertipsErrorKind::Io)?;
let mut text = String::new();
f.read_to_string(&mut text)?;
_ = f
.read_to_string(&mut text)
.map_err(FingertipsErrorKind::Io)?;
// ...and add its contents to the in-memory `accumulated_index`.
let index = InMemoryIndex::from_single_document(doc_id, text);
let index = InMemoryIndex::from_single_document(doc_id, text)?;
accumulated_index.merge(index);
if accumulated_index.is_large() {
// To avoid running out of memory, dump `accumulated_index` to disk.
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)
.map_err(FingertipsErrorKind::Io)?;
merge.add_file(file)?;
accumulated_index = InMemoryIndex::new();
}
@ -70,10 +71,13 @@ fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
// Done reading documents! Save the last data set to disk, then merge the
// temporary index files if there are more than one.
if !accumulated_index.is_empty() {
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)
.map_err(FingertipsErrorKind::Io)?;
merge.add_file(file)?;
}
merge.finish()
merge.finish()?;
Ok(())
}
/// Start a thread that loads documents from the filesystem into memory.
@ -83,16 +87,18 @@ fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
/// This returns a pair of values: a receiver that receives the documents, as
/// Strings; and a `JoinHandle` that can be used to wait for this thread to
/// exit and to get the `io::Error` value if anything goes wrong.
fn start_file_reader_thread(documents: Vec<PathBuf>)
-> (Receiver<String>, JoinHandle<io::Result<()>>)
{
fn start_file_reader_thread(
documents: Vec<PathBuf>,
) -> (Receiver<String>, JoinHandle<FingertipsResult<()>>) {
let (sender, receiver) = channel();
let handle = spawn(move || {
for filename in documents {
let mut f = File::open(filename)?;
let mut f = File::open(filename).map_err(FingertipsErrorKind::Io)?;
let mut text = String::new();
f.read_to_string(&mut text)?;
_ = f
.read_to_string(&mut text)
.map_err(FingertipsErrorKind::Io)?;
if sender.send(text).is_err() {
break;
@ -113,14 +119,16 @@ fn start_file_reader_thread(documents: Vec<PathBuf>)
/// receiver, the sequence of in-memory indexes; and a `JoinHandle` that can be
/// used to wait for this thread to exit. This stage of the pipeline is
/// infallible (it performs no I/O, so there are no possible errors).
fn start_file_indexing_thread(texts: Receiver<String>)
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
{
#[allow(clippy::expect_used)]
fn start_file_indexing_thread(
texts: Receiver<String>,
) -> (Receiver<InMemoryIndex>, JoinHandle<()>) {
let (sender, receiver) = channel();
let handle = spawn(move || {
for (doc_id, text) in texts.into_iter().enumerate() {
let index = InMemoryIndex::from_single_document(doc_id, text);
let index = InMemoryIndex::from_single_document(doc_id, text)
.expect("InMemoryIndex::from_single_document should not fail in this context");
if sender.send(index).is_err() {
break;
}
@ -143,9 +151,9 @@ fn start_file_indexing_thread(texts: Receiver<String>)
/// merging the input indexes; and a `JoinHandle` that can be used to wait for
/// this thread to exit. This stage of the pipeline is infallible (it performs
/// no I/O).
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
{
fn start_in_memory_merge_thread(
file_indexes: Receiver<InMemoryIndex>,
) -> (Receiver<InMemoryIndex>, JoinHandle<()>) {
let (sender, receiver) = channel();
let handle = spawn(move || {
@ -175,16 +183,17 @@ fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
/// This returns a pair: a receiver that receives the filenames; and a
/// `JoinHandle` that can be used to wait for this thread to exit and receive
/// any I/O errors it encountered.
fn start_index_writer_thread(big_indexes: Receiver<InMemoryIndex>,
output_dir: &Path)
-> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)
{
fn start_index_writer_thread(
big_indexes: Receiver<InMemoryIndex>,
output_dir: &Path,
) -> (Receiver<PathBuf>, JoinHandle<FingertipsResult<()>>) {
let (sender, receiver) = channel();
let mut tmp_dir = TmpDir::new(output_dir);
let handle = spawn(move || {
for index in big_indexes {
let file = write_index_to_tmp_file(index, &mut tmp_dir)?;
let file =
write_index_to_tmp_file(index, &mut tmp_dir).map_err(FingertipsErrorKind::Io)?;
if sender.send(file).is_err() {
break;
}
@ -197,14 +206,14 @@ fn start_index_writer_thread(big_indexes: Receiver<InMemoryIndex>,
/// Given a sequence of filenames of index data files, merge all the files
/// into a single index data file.
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
-> io::Result<()>
{
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path) -> FingertipsResult<()> {
let mut merge = FileMerge::new(output_dir);
for file in files {
merge.add_file(file)?;
}
merge.finish()
merge.finish()?;
Ok(())
}
/// Create an inverted index for the given list of `documents`,
@ -213,27 +222,27 @@ fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
/// On success this does exactly the same thing as `run_single_threaded`, but
/// faster since it uses multiple CPUs and keeps them busy while I/O is
/// happening.
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
#[allow(clippy::expect_used)]
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> FingertipsResult<()> {
// Launch all five stages of the pipeline.
let (texts, h1) = start_file_reader_thread(documents);
let (pints, h2) = start_file_indexing_thread(texts);
let (texts, h1) = start_file_reader_thread(documents);
let (pints, h2) = start_file_indexing_thread(texts);
let (gallons, h3) = start_in_memory_merge_thread(pints);
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
let result = merge_index_files(files, &output_dir);
// Wait for threads to finish, holding on to any errors that they encounter.
let r1 = h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
let r4 = h4.join().unwrap();
let r1 = h1.join().expect("File reader thread panicked!");
h2.join().expect("In-memory indexing thread panicked!");
h3.join().expect("In-memory indexing thread panicked!");
let r4 = h4.join().expect("Index writer thread panicked!");
// Return the first error encountered, if any.
// (As it happens, h2 and h3 can't fail: those threads
// are pure in-memory data processing.)
r1?;
r4?;
result
}
@ -263,9 +272,9 @@ fn expand_filename_arguments(args: Vec<String>) -> io::Result<Vec<PathBuf>> {
}
/// Generate an index for a bunch of text files.
fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
pub fn run(filenames: Vec<String>, single_threaded: bool) -> FingertipsResult<()> {
let output_dir = PathBuf::from(".");
let documents = expand_filename_arguments(filenames)?;
let documents = expand_filename_arguments(filenames).map_err(FingertipsErrorKind::Io)?;
if single_threaded {
run_single_threaded(documents, output_dir)
@ -273,27 +282,3 @@ fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
run_pipeline(documents, output_dir)
}
}
fn main() {
let mut single_threaded = false;
let mut filenames = vec![];
{
let mut ap = ArgumentParser::new();
ap.set_description("Make an inverted index for searching documents.");
ap.refer(&mut single_threaded)
.add_option(&["-1", "--single-threaded"], StoreTrue,
"Do all the work on a single thread.");
ap.refer(&mut filenames)
.add_argument("filenames", Collect,
"Names of files/directories to index. \
For directories, all .txt files immediately \
under the directory are indexed.");
ap.parse_args_or_exit();
}
match run(filenames, single_threaded) {
Ok(()) => {}
Err(err) => println!("error: {}", err)
}
}

@ -1,43 +1,60 @@
use std::fs::{self, File};
use std::io::{self, BufWriter};
use std::mem;
use std::path::{Path, PathBuf};
use std::{fmt, mem};
use std::{
fmt::Debug,
fs::{self, File},
};
use std::{
fmt::Formatter,
io::{self, BufWriter},
};
use crate::tmp::TmpDir;
use crate::read::IndexFileReader;
use crate::write::IndexFileWriter;
use crate::{error::FingertipsErrorKind, tmp::TmpDir};
use crate::{error::FingertipsResult, read::IndexFileReader};
pub(crate) mod constants {
// How many files to merge at a time, at most.
pub const NSTREAMS: usize = 8;
pub const MERGED_FILENAME: &str = "index.dat";
}
#[derive(Clone)]
pub struct FileMerge {
output_dir: PathBuf,
tmp_dir: TmpDir,
stacks: Vec<Vec<PathBuf>>
stacks: Vec<Vec<PathBuf>>,
}
// How many files to merge at a time, at most.
const NSTREAMS: usize = 8;
const MERGED_FILENAME: &'static str = "index.dat";
impl Debug for FileMerge {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("FileMerge")
.field("output_dir", &self.output_dir)
.field("stacks", &self.stacks)
.finish()
}
}
impl FileMerge {
pub fn new(output_dir: &Path) -> FileMerge {
FileMerge {
pub fn new(output_dir: &Path) -> Self {
Self {
output_dir: output_dir.to_owned(),
tmp_dir: TmpDir::new(output_dir.to_owned()),
stacks: vec![]
tmp_dir: TmpDir::new(output_dir),
stacks: vec![],
}
}
pub fn add_file(&mut self, mut file: PathBuf) -> io::Result<()> {
pub fn add_file(&mut self, mut file: PathBuf) -> FingertipsResult<()> {
let mut level = 0;
loop {
if level == self.stacks.len() {
self.stacks.push(vec![]);
}
self.stacks[level].push(file);
if self.stacks[level].len() < NSTREAMS {
if self.stacks[level].len() < constants::NSTREAMS {
break;
}
let (filename, out) = self.tmp_dir.create()?;
let (filename, out) = self.tmp_dir.create().map_err(FingertipsErrorKind::Io)?;
let mut to_merge = vec![];
mem::swap(&mut self.stacks[level], &mut to_merge);
merge_streams(to_merge, out)?;
@ -47,12 +64,12 @@ impl FileMerge {
Ok(())
}
pub fn finish(mut self) -> io::Result<()> {
let mut tmp = Vec::with_capacity(NSTREAMS);
pub fn finish(mut self) -> FingertipsResult<()> {
let mut tmp = Vec::with_capacity(constants::NSTREAMS);
for stack in self.stacks {
for file in stack.into_iter().rev() {
tmp.push(file);
if tmp.len() == NSTREAMS {
if tmp.len() == constants::NSTREAMS {
merge_reversed(&mut tmp, &mut self.tmp_dir)?;
}
}
@ -62,25 +79,28 @@ impl FileMerge {
merge_reversed(&mut tmp, &mut self.tmp_dir)?;
}
assert!(tmp.len() <= 1);
match tmp.pop() {
Some(last_file) =>
fs::rename(last_file, self.output_dir.join(MERGED_FILENAME)),
None =>
Err(io::Error::new(io::ErrorKind::Other,
"no documents were parsed or none contained any words"))
if let Some(last_file) = tmp.pop() {
fs::rename(last_file, self.output_dir.join(constants::MERGED_FILENAME))
.map_err(|err| FingertipsErrorKind::Io(err).into())
} else {
Err(FingertipsErrorKind::from(io::Error::new(
io::ErrorKind::Other,
"no documents were parsed or none contained any words",
))
.into())
}
}
}
fn merge_streams(files: Vec<PathBuf>, out: BufWriter<File>)
-> io::Result<()>
{
let mut streams: Vec<IndexFileReader> =
files.into_iter()
.map(IndexFileReader::open_and_delete)
.collect::<io::Result<_>>()?;
fn merge_streams(files: Vec<PathBuf>, out: BufWriter<File>) -> FingertipsResult<()> {
let mut streams: Vec<IndexFileReader> = files
.into_iter()
.map(IndexFileReader::open_and_delete)
.map(|result| result.map_err(|err| FingertipsErrorKind::Io(err).into()))
.collect::<FingertipsResult<_>>()?;
let mut output = IndexFileWriter::new(out)?;
let mut output = IndexFileWriter::new(out).map_err(FingertipsErrorKind::Io)?;
let mut point: u64 = 0;
let mut count = streams.iter().filter(|s| s.peek().is_some()).count();
@ -92,18 +112,20 @@ fn merge_streams(files: Vec<PathBuf>, out: BufWriter<File>)
match s.peek() {
None => {}
Some(entry) => {
if term.is_none() || entry.term < *term.as_ref().unwrap() {
if term.is_none()
|| entry.term < *term.as_ref().ok_or(FingertipsErrorKind::TermEmpty)?
{
term = Some(entry.term.clone()); // XXX LAME clone
nbytes = entry.nbytes;
df = entry.df;
} else if entry.term == *term.as_ref().unwrap() {
} else if entry.term == *term.as_ref().ok_or(FingertipsErrorKind::TermEmpty)? {
nbytes += entry.nbytes;
df += entry.df;
}
}
}
}
let term = term.expect("bug in algorithm!");
let term = term.ok_or(FingertipsErrorKind::AlgorithmError)?;
for s in &mut streams {
if s.is_at(&term) {
@ -113,18 +135,22 @@ fn merge_streams(files: Vec<PathBuf>, out: BufWriter<File>)
}
}
}
output.write_contents_entry(term, df, point, nbytes as u64);
point += nbytes as u64;
output
.write_contents_entry(term, df, point, nbytes)
.map_err(FingertipsErrorKind::Io)?;
point += nbytes;
}
assert!(streams.iter().all(|s| s.peek().is_none()));
output.finish()
Ok(output.finish().map_err(FingertipsErrorKind::Io)?)
}
fn merge_reversed(filenames: &mut Vec<PathBuf>, tmp_dir: &mut TmpDir) -> io::Result<()> {
fn merge_reversed(filenames: &mut Vec<PathBuf>, tmp_dir: &mut TmpDir) -> FingertipsResult<()> {
filenames.reverse();
let (merged_filename, out) = tmp_dir.create()?;
let mut to_merge = Vec::with_capacity(NSTREAMS);
let (merged_filename, out) = tmp_dir.create().map_err(FingertipsErrorKind::Io)?;
let mut to_merge = Vec::with_capacity(constants::NSTREAMS);
mem::swap(filenames, &mut to_merge);
merge_streams(to_merge, out)?;
filenames.push(merged_filename);

@ -1,12 +1,15 @@
//! Reading index files linearly from disk, a capability needed for merging
//! index files.
use crate::{
error::{FingertipsErrorKind, FingertipsResult},
write::IndexFileWriter,
};
use byteorder::{LittleEndian, ReadBytesExt};
use std::fs::{self, File};
use std::io::prelude::*;
use std::io::{self, BufReader, SeekFrom};
use std::path::Path;
use byteorder::{LittleEndian, ReadBytesExt};
use crate::write::IndexFileWriter;
/// A `IndexFileReader` does a single linear pass over an index file from
/// beginning to end. Needless to say, this is not how an index is normally
@ -30,7 +33,7 @@ pub struct IndexFileReader {
/// The next entry in the table of contents, if any; or `None` if we've
/// reached the end of the table. `IndexFileReader` always reads ahead one
/// entry in the contents and stores it here.
next: Option<Entry>
next: Option<Entry>,
}
/// An entry in the table of contents of an index file.
@ -38,6 +41,7 @@ pub struct IndexFileReader {
/// Each entry in the table of contents is small. It consists of a string, the
/// `term`; summary information about that term, as used in the corpus (`df`);
/// and a pointer to bulkier data that tells more (`offset` and `nbytes`).
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
pub struct Entry {
/// The term is a word that appears in one or more documents in the corpus.
/// The index file contains information about the documents that use this
@ -51,7 +55,7 @@ pub struct Entry {
pub offset: u64,
/// Length of the index data for this term, in bytes.
pub nbytes: u64
pub nbytes: u64,
}
impl IndexFileReader {
@ -62,31 +66,35 @@ impl IndexFileReader {
/// from its directory, but it'll still take up space on disk until the
/// file is closed, which normally happens when the `IndexFileReader` is
/// dropped.
pub fn open_and_delete<P: AsRef<Path>>(filename: P) -> io::Result<IndexFileReader> {
pub fn open_and_delete<P: AsRef<Path>>(filename: P) -> io::Result<Self> {
let filename = filename.as_ref();
let mut main_raw = File::open(filename)?;
// Read the file header.
let contents_offset = main_raw.read_u64::<LittleEndian>()?;
println!("opened {}, table of contents starts at {}", filename.display(), contents_offset);
println!(
"opened {}, table of contents starts at {}",
filename.display(),
contents_offset
);
// Open again so we have two read heads;
// move the contents read head to its starting position.
// Set up buffering.
let mut contents_raw = File::open(filename)?;
contents_raw.seek(SeekFrom::Start(contents_offset))?;
let _start = contents_raw.seek(SeekFrom::Start(contents_offset))?;
let main = BufReader::new(main_raw);
let mut contents = BufReader::new(contents_raw);
// We always read ahead one entry, so load the first entry right away.
let first = IndexFileReader::read_entry(&mut contents)?;
let first = Self::read_entry(&mut contents)?;
fs::remove_file(filename)?; // YOLO
fs::remove_file(filename)?; // YOLO
Ok(IndexFileReader {
main: main,
contents: contents,
next: first
Ok(Self {
main,
contents,
next: first,
})
}
@ -98,30 +106,30 @@ impl IndexFileReader {
// that's considered a success, with no entry read.
let offset = match f.read_u64::<LittleEndian>() {
Ok(value) => value,
Err(err) =>
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None)
return Ok(None);
} else {
return Err(err)
return Err(err);
}
}
};
let nbytes = f.read_u64::<LittleEndian>()?;
let df = f.read_u32::<LittleEndian>()?;
let term_len = f.read_u32::<LittleEndian>()? as usize;
let mut bytes = Vec::with_capacity(term_len);
bytes.resize(term_len, 0);
let mut bytes = vec![0; term_len];
f.read_exact(&mut bytes)?;
let term = match String::from_utf8(bytes) {
Ok(s) => s,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "unicode fail"))
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "unicode fail")),
};
Ok(Some(Entry {
term: term,
df: df,
offset: offset,
nbytes: nbytes
term,
df,
offset,
nbytes,
}))
}
@ -129,35 +137,41 @@ impl IndexFileReader {
/// (Since we always read ahead one entry, this method can't fail.)
///
/// Returns `None` if we've reached the end of the file.
pub fn peek(&self) -> Option<&Entry> { self.next.as_ref() }
pub fn peek(&self) -> Option<&Entry> {
self.next.as_ref()
}
/// True if the next entry is for the given term.
pub fn is_at(&self, term: &str) -> bool {
match self.next {
Some(ref e) => e.term == term,
None => false
None => false,
}
}
/// Copy the current entry to the specified output stream, then read the
/// header for the next entry.
pub fn move_entry_to(&mut self, out: &mut IndexFileWriter) -> io::Result<()> {
pub fn move_entry_to(&mut self, out: &mut IndexFileWriter) -> FingertipsResult<()> {
// This block limits the scope of borrowing `self.next` (for `e`),
// because after this block is over we'll want to assign to `self.next`.
{
let e = self.next.as_ref().expect("no entry to move");
let e = self
.next
.as_ref()
.ok_or(FingertipsErrorKind::NoEntryToMove)?;
if e.nbytes > usize::max_value() as u64 {
// This can only happen on 32-bit platforms.
return Err(io::Error::new(io::ErrorKind::Other,
"computer not big enough to hold index entry"));
return Err(FingertipsErrorKind::PlatformLimitExceeded.into());
}
let mut buf = Vec::with_capacity(e.nbytes as usize);
buf.resize(e.nbytes as usize, 0);
self.main.read_exact(&mut buf)?;
out.write_main(&buf)?;
let mut buf = vec![0; e.nbytes as usize];
self.main
.read_exact(&mut buf)
.map_err(FingertipsErrorKind::Io)?;
out.write_main(&buf).map_err(FingertipsErrorKind::Io)?;
}
self.next = Self::read_entry(&mut self.contents)?;
self.next = Self::read_entry(&mut self.contents).map_err(FingertipsErrorKind::Io)?;
Ok(())
}
}

@ -1,39 +1,41 @@
use std::io::{self, BufWriter};
use std::fs::{self, File};
use std::io::{self, BufWriter};
use std::path::{Path, PathBuf};
#[derive(Clone)]
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct TmpDir {
dir: PathBuf,
n: usize
n: usize,
}
impl TmpDir {
pub fn new<P: AsRef<Path>>(dir: P) -> TmpDir {
TmpDir {
pub fn new<P: AsRef<Path>>(dir: P) -> Self {
Self {
dir: dir.as_ref().to_owned(),
n: 1
n: 1,
}
}
pub fn create(&mut self) -> io::Result<(PathBuf, BufWriter<File>)> {
let mut r#try = 1;
loop {
let filename = self.dir.join(PathBuf::from(format!("tmp{:08x}.dat", self.n)));
let filename = self
.dir
.join(PathBuf::from(format!("tmp{:08x}.dat", self.n)));
self.n += 1;
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&filename)
.write(true)
.create_new(true)
.open(&filename)
{
Ok(f) =>
return Ok((filename, BufWriter::new(f))),
Err(exc) =>
Ok(f) => return Ok((filename, BufWriter::new(f))),
Err(exc) => {
if r#try < 999 && exc.kind() == io::ErrorKind::AlreadyExists {
// keep going
} else {
return Err(exc);
}
}
}
r#try += 1;
}

@ -1,10 +1,10 @@
use std::fs::File;
use std::io::{self, BufWriter, SeekFrom};
use std::io::prelude::*;
use std::path::PathBuf;
use crate::index::InMemoryIndex;
use crate::tmp::TmpDir;
use byteorder::{LittleEndian, WriteBytesExt};
use std::fs::File;
use std::io::prelude::*;
use std::io::{self, BufWriter, SeekFrom};
use std::path::PathBuf;
/// Writer for saving an index to a binary file.
///
@ -15,6 +15,7 @@ use byteorder::{LittleEndian, WriteBytesExt};
/// An index file has two parts. The main part of the file is a sequence of
/// entries, stored back-to-back; the
#[derive(Debug)]
pub struct IndexFileWriter {
/// The number of bytes written so far.
offset: u64,
@ -23,17 +24,17 @@ pub struct IndexFileWriter {
writer: BufWriter<File>,
/// The table of contents for this file.
contents_buf: Vec<u8>
contents_buf: Vec<u8>,
}
impl IndexFileWriter {
pub fn new(mut f: BufWriter<File>) -> io::Result<IndexFileWriter> {
pub fn new(mut f: BufWriter<File>) -> io::Result<Self> {
const HEADER_SIZE: u64 = 8;
f.write_u64::<LittleEndian>(0)?;
Ok(IndexFileWriter {
Ok(Self {
offset: HEADER_SIZE,
writer: f,
contents_buf: vec![]
contents_buf: vec![],
})
}
@ -43,21 +44,34 @@ impl IndexFileWriter {
Ok(())
}
pub fn write_contents_entry(&mut self, term: String, df: u32, offset: u64, nbytes: u64) {
self.contents_buf.write_u64::<LittleEndian>(offset).unwrap();
self.contents_buf.write_u64::<LittleEndian>(nbytes).unwrap();
self.contents_buf.write_u32::<LittleEndian>(df).unwrap();
pub fn write_contents_entry(
&mut self,
term: String,
df: u32,
offset: u64,
nbytes: u64,
) -> io::Result<()> {
self.contents_buf.write_u64::<LittleEndian>(offset)?;
self.contents_buf.write_u64::<LittleEndian>(nbytes)?;
self.contents_buf.write_u32::<LittleEndian>(df)?;
let bytes = term.bytes();
self.contents_buf.write_u32::<LittleEndian>(bytes.len() as u32).unwrap();
self.contents_buf
.write_u32::<LittleEndian>(bytes.len() as u32)?;
self.contents_buf.extend(bytes);
Ok(())
}
/// Finish writing the index file and close it.
pub fn finish(mut self) -> io::Result<()> {
let contents_start = self.offset;
self.writer.write_all(&self.contents_buf)?;
println!("{} bytes main, {} bytes total", contents_start, contents_start + self.contents_buf.len() as u64);
self.writer.seek(SeekFrom::Start(0))?;
println!(
"{} bytes main, {} bytes total",
contents_start,
contents_start + self.contents_buf.len() as u64
);
let _start = self.writer.seek(SeekFrom::Start(0))?;
self.writer.write_u64::<LittleEndian>(contents_start)?;
Ok(())
}
@ -70,7 +84,7 @@ pub fn write_index_to_tmp_file(index: InMemoryIndex, tmp_dir: &mut TmpDir) -> io
// The merge algorithm requires the entries within each file to be sorted by term.
// Sort before writing anything.
let mut index_as_vec: Vec<_> = index.map.into_iter().collect();
index_as_vec.sort_by(|&(ref a, _), &(ref b, _)| a.cmp(b));
index_as_vec.sort_by(|(a, _), (b, _)| a.cmp(b));
for (term, hits) in index_as_vec {
let df = hits.len() as u32;
@ -79,10 +93,11 @@ pub fn write_index_to_tmp_file(index: InMemoryIndex, tmp_dir: &mut TmpDir) -> io
writer.write_main(&buffer)?;
}
let stop = writer.offset;
writer.write_contents_entry(term, df, start, stop - start);
writer.write_contents_entry(term, df, start, stop - start)?;
}
writer.finish()?;
println!("wrote file {:?}", filename);
println!("wrote file {filename:?}");
Ok(filename)
}

Loading…
Cancel
Save