fingertips: Comments.

master
Jason Orendorff 7 years ago
parent dcbebecb72
commit e24ff549a6

@ -1,26 +1,52 @@
//! In-memory indexes.
//!
//! The first step in building the index is to index documents in memory.
//! `InMemoryIndex` can be used to do that, up to the size of the machine's
//! memory.
use std::collections::HashMap;
use byteorder::{LittleEndian, WriteBytesExt};
/// Break a string into words.
fn tokenize(text: &str) -> Vec<&str> {
text.split(|ch: char| !ch.is_alphanumeric())
.filter(|word| !word.is_empty())
.collect()
}
/// An in-memory index.
///
/// Of course, a real index for a large corpus of documents won't fit in
/// memory. But apart from memory constraints, this is everything you need to
/// answer simple search queries. And you can use the `read`, `write`, and
/// `merge` modules to save an in-memory index to disk and merge it with other
/// indices, producing a large index.
pub struct InMemoryIndex {
/// The total number of words in the indexed documents.
pub word_count: usize,
/// For every term that appears in the index, the list of all search hits
/// for that term (i.e. which documents contain that term, and where).
///
/// It's possible for an index to be "sorted by document id", which means
/// that for every `Vec<Hit>` in this map, the `Hit` elements all have
/// distinct document ids (the first u32) and the `Hit`s are arranged by
/// document id in increasing order. This is handy for some algorithms you
/// might want to run on the index, so we preserve this property wherever
/// possible.
pub map: HashMap<String, Vec<Hit>>
}
/// A record indicating that a particular document contains some term,
/// how many times it appears, and at what offsets.
/// A `Hit` indicates that a particular document contains some term, how many
/// times it appears, and at what offsets (that is, the word count, from the
/// beginning of the document, of each place where the term appears).
///
/// The buffer contains all the hit data in binary form, little-endian.
/// The first u32 of the data is the document id.
/// The remaining [u32] are offsets.
/// The buffer contains all the hit data in binary form, little-endian. The
/// first u32 of the data is the document id. The remaining [u32] are offsets.
pub type Hit = Vec<u8>;
impl InMemoryIndex {
/// Create a new, empty index.
pub fn new() -> InMemoryIndex {
InMemoryIndex {
word_count: 0,
@ -28,6 +54,9 @@ impl InMemoryIndex {
}
}
/// Index a single document.
///
/// The resulting index contains exactly one `Hit` per term.
pub fn from_single_document(document_id: usize, text: String) -> InMemoryIndex {
let document_id = document_id as u32;
let mut index = InMemoryIndex::new();
@ -54,6 +83,11 @@ impl InMemoryIndex {
index
}
/// Add all search hits from `other` to this index.
///
/// If both `*self` and `other` are sorted by document id, and all document
/// ids in `other` are greater than every document id in `*self`, then
/// `*self` remains sorted by document id after merging.
pub fn merge(&mut self, other: InMemoryIndex) {
for (term, hits) in other.map {
self.map.entry(term)
@ -63,10 +97,13 @@ impl InMemoryIndex {
self.word_count += other.word_count;
}
/// True if this index contains no data.
pub fn is_empty(&self) -> bool {
self.word_count == 0
}
/// True if this index is large enough that we should dump it to disk rather
/// than keep adding more data to it.
pub fn is_large(&self) -> bool {
// This depends on how much memory your computer has, of course.
const REASONABLE_SIZE: usize = 100_000_000;

@ -1,3 +1,18 @@
/// `fingertips` creates an inverted index for a set of text files.
///
/// Most of the actual work is done by the modules `index`, `read`, `write`,
/// and `merge`. In this file, `main.rs`, we put the pieces together in two
/// different ways.
///
/// * `run_single_threaded` simply does everything in one thread, in
/// the most straightforward possible way.
///
/// * Then, we break the work into a five-stage pipeline so that we can run
/// it on multiple CPUs. `run_pipeline` puts the five stages together.
///
/// The `main` function at the end handles command-line arguments. It calls one
/// of the two functions above to do the work.
extern crate argparse;
extern crate byteorder;
@ -21,6 +36,57 @@ use write::write_index_to_tmp_file;
use merge::FileMerge;
use tmp::TmpDir;
/// Create an inverted index for the given list of `documents`,
/// storing it in the specified `output_dir`.
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
// If all the documents fit comfortably in memory, we'll create the whole
// index in memory.
let mut accumulated_index = InMemoryIndex::new();
// If not, then as memory fills up, we'll write largeish temporary index
// files to disk, saving the temporary filenames in `merge` so that later we
// can merge them all into a single huge file.
let mut merge = FileMerge::new(&output_dir);
// A tool for generating temporary filenames.
let mut tmp_dir = TmpDir::new(&output_dir);
// For each document in the set...
for (doc_id, filename) in documents.into_iter().enumerate() {
// ...load it into memory...
let mut f = File::open(filename)?;
let mut text = String::new();
f.read_to_string(&mut text)?;
// ...and add its contents to the in-memory `accumulated_index`.
let index = InMemoryIndex::from_single_document(doc_id, text);
accumulated_index.merge(index);
if accumulated_index.is_large() {
// To avoid running out of memory, dump `accumulated_index` to disk.
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
merge.add_file(file)?;
accumulated_index = InMemoryIndex::new();
}
}
// Done reading documents! Save the last data set to disk, then merge the
// temporary index files if there are more than one.
if !accumulated_index.is_empty() {
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
merge.add_file(file)?;
}
merge.finish()
}
/// Start a thread that loads documents from the filesystem into memory.
///
/// `documents` is a list of filenames to load.
///
/// This returns a pair of values: a receiver that receives the documents, as
/// Strings; and a `JoinHandle` that can be used to wait for this thread to
/// exit and to get the `io::Error` value if anything goes wrong.
fn start_file_reader_thread(documents: Vec<PathBuf>)
-> (Receiver<String>, JoinHandle<io::Result<()>>)
{
@ -42,6 +108,15 @@ fn start_file_reader_thread(documents: Vec<PathBuf>)
(receiver, handle)
}
/// Start a thread that tokenizes each text and converts it into an in-memory
/// index. (We assume that every document fits comfortably in memory.)
///
/// `texts` is the stream of documents from the file reader thread.
///
/// This assigns each document a number. It returns a pair of values: a
/// receiver, the sequence of in-memory indexes; and a `JoinHandle` that can be
/// used to wait for this thread to exit. This stage of the pipeline is
/// infallible (it performs no I/O, so there are no possible errors).
fn start_file_indexing_thread(texts: Receiver<String>)
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
{
@ -113,6 +188,12 @@ fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
merge.finish()
}
/// Create an inverted index for the given list of `documents`,
/// storing it in the specified `output_dir`.
///
/// On success this does exactly the same thing as `run_single_threaded`, but
/// faster since it uses multiple CPUs and keeps them busy while I/O is
/// happening.
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
@ -137,33 +218,6 @@ fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
result
}
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
let mut accumulated_index = InMemoryIndex::new();
let mut merge = FileMerge::new(&output_dir);
let mut tmp_dir = TmpDir::new(&output_dir);
for (doc_id, filename) in documents.into_iter().enumerate() {
let mut f = File::open(filename)?;
let mut text = String::new();
f.read_to_string(&mut text)?;
let index = InMemoryIndex::from_single_document(doc_id, text);
accumulated_index.merge(index);
if accumulated_index.is_large() {
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
merge.add_file(file)?;
accumulated_index = InMemoryIndex::new();
}
}
if !accumulated_index.is_empty() {
let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?;
merge.add_file(file)?;
}
merge.finish()
}
fn expand_filename_arguments(args: Vec<String>) -> io::Result<Vec<PathBuf>> {
let mut filenames = vec![];
for arg in args {
@ -192,8 +246,8 @@ fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
run_pipeline(documents, output_dir)
}
}
fn main() {
fn main() {
let mut single_threaded = false;
let mut filenames = vec![];
@ -210,7 +264,7 @@ fn main() {
under the directory are indexed.");
ap.parse_args_or_exit();
}
match run(filenames, single_threaded) {
Ok(()) => {}
Err(err) => println!("error: {:?}", err.description())

@ -1,3 +1,6 @@
//! Reading index files linearly from disk, a capability needed for merging
//! index files.
use std::fs::{self, File};
use std::io::prelude::*;
use std::io::{self, BufReader, SeekFrom};

Loading…
Cancel
Save