From e24ff549a631a2c3d8ebb2fdbba2e7384eef94db Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 20 Jun 2017 15:04:57 -0500 Subject: [PATCH] fingertips: Comments. --- src/index.rs | 47 ++++++++++++++++++--- src/main.rs | 114 +++++++++++++++++++++++++++++++++++++-------------- src/read.rs | 3 ++ 3 files changed, 129 insertions(+), 35 deletions(-) diff --git a/src/index.rs b/src/index.rs index 7228b85..e0c6b9f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,26 +1,52 @@ +//! In-memory indexes. +//! +//! The first step in building the index is to index documents in memory. +//! `InMemoryIndex` can be used to do that, up to the size of the machine's +//! memory. + use std::collections::HashMap; use byteorder::{LittleEndian, WriteBytesExt}; +/// Break a string into words. fn tokenize(text: &str) -> Vec<&str> { text.split(|ch: char| !ch.is_alphanumeric()) .filter(|word| !word.is_empty()) .collect() } +/// An in-memory index. +/// +/// Of course, a real index for a large corpus of documents won't fit in +/// memory. But apart from memory constraints, this is everything you need to +/// answer simple search queries. And you can use the `read`, `write`, and +/// `merge` modules to save an in-memory index to disk and merge it with other +/// indices, producing a large index. pub struct InMemoryIndex { + /// The total number of words in the indexed documents. pub word_count: usize, + + /// For every term that appears in the index, the list of all search hits + /// for that term (i.e. which documents contain that term, and where). + /// + /// It's possible for an index to be "sorted by document id", which means + /// that for every `Vec` in this map, the `Hit` elements all have + /// distinct document ids (the first u32) and the `Hit`s are arranged by + /// document id in increasing order. This is handy for some algorithms you + /// might want to run on the index, so we preserve this property wherever + /// possible. pub map: HashMap> } -/// A record indicating that a particular document contains some term, -/// how many times it appears, and at what offsets. +/// A `Hit` indicates that a particular document contains some term, how many +/// times it appears, and at what offsets (that is, the word count, from the +/// beginning of the document, of each place where the term appears). /// -/// The buffer contains all the hit data in binary form, little-endian. -/// The first u32 of the data is the document id. -/// The remaining [u32] are offsets. +/// The buffer contains all the hit data in binary form, little-endian. The +/// first u32 of the data is the document id. The remaining [u32] are offsets. pub type Hit = Vec; impl InMemoryIndex { + /// Create a new, empty index. pub fn new() -> InMemoryIndex { InMemoryIndex { word_count: 0, @@ -28,6 +54,9 @@ impl InMemoryIndex { } } + /// Index a single document. + /// + /// The resulting index contains exactly one `Hit` per term. pub fn from_single_document(document_id: usize, text: String) -> InMemoryIndex { let document_id = document_id as u32; let mut index = InMemoryIndex::new(); @@ -54,6 +83,11 @@ impl InMemoryIndex { index } + /// Add all search hits from `other` to this index. + /// + /// If both `*self` and `other` are sorted by document id, and all document + /// ids in `other` are greater than every document id in `*self`, then + /// `*self` remains sorted by document id after merging. pub fn merge(&mut self, other: InMemoryIndex) { for (term, hits) in other.map { self.map.entry(term) @@ -63,10 +97,13 @@ impl InMemoryIndex { self.word_count += other.word_count; } + /// True if this index contains no data. pub fn is_empty(&self) -> bool { self.word_count == 0 } + /// True if this index is large enough that we should dump it to disk rather + /// than keep adding more data to it. pub fn is_large(&self) -> bool { // This depends on how much memory your computer has, of course. const REASONABLE_SIZE: usize = 100_000_000; diff --git a/src/main.rs b/src/main.rs index 20e99d1..2b83054 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,18 @@ +/// `fingertips` creates an inverted index for a set of text files. +/// +/// Most of the actual work is done by the modules `index`, `read`, `write`, +/// and `merge`. In this file, `main.rs`, we put the pieces together in two +/// different ways. +/// +/// * `run_single_threaded` simply does everything in one thread, in +/// the most straightforward possible way. +/// +/// * Then, we break the work into a five-stage pipeline so that we can run +/// it on multiple CPUs. `run_pipeline` puts the five stages together. +/// +/// The `main` function at the end handles command-line arguments. It calls one +/// of the two functions above to do the work. + extern crate argparse; extern crate byteorder; @@ -21,6 +36,57 @@ use write::write_index_to_tmp_file; use merge::FileMerge; use tmp::TmpDir; +/// Create an inverted index for the given list of `documents`, +/// storing it in the specified `output_dir`. +fn run_single_threaded(documents: Vec, output_dir: PathBuf) + -> io::Result<()> +{ + // If all the documents fit comfortably in memory, we'll create the whole + // index in memory. + let mut accumulated_index = InMemoryIndex::new(); + + // If not, then as memory fills up, we'll write largeish temporary index + // files to disk, saving the temporary filenames in `merge` so that later we + // can merge them all into a single huge file. + let mut merge = FileMerge::new(&output_dir); + + // A tool for generating temporary filenames. + let mut tmp_dir = TmpDir::new(&output_dir); + + // For each document in the set... + for (doc_id, filename) in documents.into_iter().enumerate() { + // ...load it into memory... + let mut f = File::open(filename)?; + let mut text = String::new(); + f.read_to_string(&mut text)?; + + // ...and add its contents to the in-memory `accumulated_index`. + let index = InMemoryIndex::from_single_document(doc_id, text); + accumulated_index.merge(index); + if accumulated_index.is_large() { + // To avoid running out of memory, dump `accumulated_index` to disk. + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + merge.add_file(file)?; + accumulated_index = InMemoryIndex::new(); + } + } + + // Done reading documents! Save the last data set to disk, then merge the + // temporary index files if there are more than one. + if !accumulated_index.is_empty() { + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + merge.add_file(file)?; + } + merge.finish() +} + +/// Start a thread that loads documents from the filesystem into memory. +/// +/// `documents` is a list of filenames to load. +/// +/// This returns a pair of values: a receiver that receives the documents, as +/// Strings; and a `JoinHandle` that can be used to wait for this thread to +/// exit and to get the `io::Error` value if anything goes wrong. fn start_file_reader_thread(documents: Vec) -> (Receiver, JoinHandle>) { @@ -42,6 +108,15 @@ fn start_file_reader_thread(documents: Vec) (receiver, handle) } +/// Start a thread that tokenizes each text and converts it into an in-memory +/// index. (We assume that every document fits comfortably in memory.) +/// +/// `texts` is the stream of documents from the file reader thread. +/// +/// This assigns each document a number. It returns a pair of values: a +/// receiver, the sequence of in-memory indexes; and a `JoinHandle` that can be +/// used to wait for this thread to exit. This stage of the pipeline is +/// infallible (it performs no I/O, so there are no possible errors). fn start_file_indexing_thread(texts: Receiver) -> (Receiver, JoinHandle<()>) { @@ -113,6 +188,12 @@ fn merge_index_files(files: Receiver, output_dir: &Path) merge.finish() } +/// Create an inverted index for the given list of `documents`, +/// storing it in the specified `output_dir`. +/// +/// On success this does exactly the same thing as `run_single_threaded`, but +/// faster since it uses multiple CPUs and keeps them busy while I/O is +/// happening. fn run_pipeline(documents: Vec, output_dir: PathBuf) -> io::Result<()> { @@ -137,33 +218,6 @@ fn run_pipeline(documents: Vec, output_dir: PathBuf) result } -fn run_single_threaded(documents: Vec, output_dir: PathBuf) - -> io::Result<()> -{ - let mut accumulated_index = InMemoryIndex::new(); - let mut merge = FileMerge::new(&output_dir); - let mut tmp_dir = TmpDir::new(&output_dir); - for (doc_id, filename) in documents.into_iter().enumerate() { - let mut f = File::open(filename)?; - let mut text = String::new(); - f.read_to_string(&mut text)?; - - let index = InMemoryIndex::from_single_document(doc_id, text); - accumulated_index.merge(index); - if accumulated_index.is_large() { - let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; - merge.add_file(file)?; - accumulated_index = InMemoryIndex::new(); - } - } - - if !accumulated_index.is_empty() { - let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; - merge.add_file(file)?; - } - merge.finish() -} - fn expand_filename_arguments(args: Vec) -> io::Result> { let mut filenames = vec![]; for arg in args { @@ -192,8 +246,8 @@ fn run(filenames: Vec, single_threaded: bool) -> io::Result<()> { run_pipeline(documents, output_dir) } } - -fn main() { + +fn main() { let mut single_threaded = false; let mut filenames = vec![]; @@ -210,7 +264,7 @@ fn main() { under the directory are indexed."); ap.parse_args_or_exit(); } - + match run(filenames, single_threaded) { Ok(()) => {} Err(err) => println!("error: {:?}", err.description()) diff --git a/src/read.rs b/src/read.rs index 8faebc9..50ece07 100644 --- a/src/read.rs +++ b/src/read.rs @@ -1,3 +1,6 @@ +//! Reading index files linearly from disk, a capability needed for merging +//! index files. + use std::fs::{self, File}; use std::io::prelude::*; use std::io::{self, BufReader, SeekFrom};