From dcbebecb72fbbdaffbcf235841654fd5af8e7445 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 20 Jun 2017 15:02:08 -0500 Subject: [PATCH] Initial commit. --- .gitignore | 1 + Cargo.lock | 21 +++++ Cargo.toml | 8 ++ src/index.rs | 75 ++++++++++++++++++ src/main.rs | 218 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/merge.rs | 132 +++++++++++++++++++++++++++++++ src/read.rs | 115 +++++++++++++++++++++++++++ src/tmp.rs | 42 ++++++++++ src/write.rs | 73 +++++++++++++++++ 9 files changed, 685 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/index.rs create mode 100644 src/main.rs create mode 100644 src/merge.rs create mode 100644 src/read.rs create mode 100644 src/tmp.rs create mode 100644 src/write.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eb5a316 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..871d662 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,21 @@ +[root] +name = "fingertips" +version = "0.1.0" +dependencies = [ + "argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "argparse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[metadata] +"checksum argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37bb99f5e39ee8b23b6e227f5b8f024207e8616f44aa4b8c76ecd828011667ef" +"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b895894 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "fingertips" +version = "0.1.0" +authors = ["Jason Orendorff "] + +[dependencies] +argparse = "0.2.1" +byteorder = "0.5.3" diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..7228b85 --- /dev/null +++ b/src/index.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; +use byteorder::{LittleEndian, WriteBytesExt}; + +fn tokenize(text: &str) -> Vec<&str> { + text.split(|ch: char| !ch.is_alphanumeric()) + .filter(|word| !word.is_empty()) + .collect() +} + +pub struct InMemoryIndex { + pub word_count: usize, + pub map: HashMap> +} + +/// A record indicating that a particular document contains some term, +/// how many times it appears, and at what offsets. +/// +/// The buffer contains all the hit data in binary form, little-endian. +/// The first u32 of the data is the document id. +/// The remaining [u32] are offsets. +pub type Hit = Vec; + +impl InMemoryIndex { + pub fn new() -> InMemoryIndex { + InMemoryIndex { + word_count: 0, + map: HashMap::new() + } + } + + pub fn from_single_document(document_id: usize, text: String) -> InMemoryIndex { + let document_id = document_id as u32; + let mut index = InMemoryIndex::new(); + + let text = text.to_lowercase(); + let tokens = tokenize(&text); + for (i, token) in tokens.iter().enumerate() { + let mut hits = + index.map + .entry(token.to_string()) + .or_insert_with(|| { + let mut hits = Vec::with_capacity(4 + 4); + hits.write_u32::(document_id).unwrap(); + vec![hits] + }); + hits[0].write_u32::(i as u32).unwrap(); + index.word_count += 1; + } + + if document_id % 100 == 0 { + println!("indexed document {}, {} bytes, {} words", document_id, text.len(), index.word_count); + } + + index + } + + pub fn merge(&mut self, other: InMemoryIndex) { + for (term, hits) in other.map { + self.map.entry(term) + .or_insert_with(|| vec![]) + .extend(hits) + } + self.word_count += other.word_count; + } + + pub fn is_empty(&self) -> bool { + self.word_count == 0 + } + + pub fn is_large(&self) -> bool { + // This depends on how much memory your computer has, of course. + const REASONABLE_SIZE: usize = 100_000_000; + self.word_count > REASONABLE_SIZE + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..20e99d1 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,218 @@ +extern crate argparse; +extern crate byteorder; + +mod index; +mod read; +mod write; +mod merge; +mod tmp; + +use std::error::Error; +use std::fs::File; +use std::io; +use std::io::prelude::*; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::{channel, Receiver}; +use std::thread::{spawn, JoinHandle}; +use argparse::{ArgumentParser, StoreTrue, Collect}; + +use index::InMemoryIndex; +use write::write_index_to_tmp_file; +use merge::FileMerge; +use tmp::TmpDir; + +fn start_file_reader_thread(documents: Vec) + -> (Receiver, JoinHandle>) +{ + let (sender, receiver) = channel(); + + let handle = spawn(move || { + for filename in documents { + let mut f = File::open(filename)?; + let mut text = String::new(); + f.read_to_string(&mut text)?; + + if sender.send(text).is_err() { + break; + } + } + Ok(()) + }); + + (receiver, handle) +} + +fn start_file_indexing_thread(texts: Receiver) + -> (Receiver, JoinHandle<()>) +{ + let (sender, receiver) = channel(); + + let handle = spawn(move || { + for (doc_id, text) in texts.into_iter().enumerate() { + let index = InMemoryIndex::from_single_document(doc_id, text); + if sender.send(index).is_err() { + break; + } + } + }); + + (receiver, handle) +} + +fn start_in_memory_merge_thread(file_indexes: Receiver) + -> (Receiver, JoinHandle<()>) +{ + let (sender, receiver) = channel(); + + let handle = spawn(move || { + let mut accumulated_index = InMemoryIndex::new(); + for fi in file_indexes { + accumulated_index.merge(fi); + if accumulated_index.is_large() { + if sender.send(accumulated_index).is_err() { + return; + } + accumulated_index = InMemoryIndex::new(); + } + } + if !accumulated_index.is_empty() { + let _ = sender.send(accumulated_index); + } + }); + + (receiver, handle) +} + +fn start_index_writer_thread(big_indexes: Receiver, + output_dir: &Path) + -> (Receiver, JoinHandle>) +{ + let (sender, receiver) = channel(); + + let mut tmp_dir = TmpDir::new(output_dir); + let handle = spawn(move || { + for index in big_indexes { + let file = write_index_to_tmp_file(index, &mut tmp_dir)?; + if sender.send(file).is_err() { + break; + } + } + Ok(()) + }); + + (receiver, handle) +} + +fn merge_index_files(files: Receiver, output_dir: &Path) + -> io::Result<()> +{ + let mut merge = FileMerge::new(output_dir); + for file in files { + merge.add_file(file)?; + } + merge.finish() +} + +fn run_pipeline(documents: Vec, output_dir: PathBuf) + -> io::Result<()> +{ + // Launch all five stages of the pipeline. + let (texts, h1) = start_file_reader_thread(documents); + let (pints, h2) = start_file_indexing_thread(texts); + let (gallons, h3) = start_in_memory_merge_thread(pints); + let (files, h4) = start_index_writer_thread(gallons, &output_dir); + let result = merge_index_files(files, &output_dir); + + // Wait for threads to finish, holding on to any errors that they encounter. + let r1 = h1.join().unwrap(); + h2.join().unwrap(); + h3.join().unwrap(); + let r4 = h4.join().unwrap(); + + // Return the first error encountered, if any. + // (As it happens, h2 and h3 can't fail: those threads + // are pure in-memory data processing.) + r1?; + r4?; + result +} + +fn run_single_threaded(documents: Vec, output_dir: PathBuf) + -> io::Result<()> +{ + let mut accumulated_index = InMemoryIndex::new(); + let mut merge = FileMerge::new(&output_dir); + let mut tmp_dir = TmpDir::new(&output_dir); + for (doc_id, filename) in documents.into_iter().enumerate() { + let mut f = File::open(filename)?; + let mut text = String::new(); + f.read_to_string(&mut text)?; + + let index = InMemoryIndex::from_single_document(doc_id, text); + accumulated_index.merge(index); + if accumulated_index.is_large() { + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + merge.add_file(file)?; + accumulated_index = InMemoryIndex::new(); + } + } + + if !accumulated_index.is_empty() { + let file = write_index_to_tmp_file(accumulated_index, &mut tmp_dir)?; + merge.add_file(file)?; + } + merge.finish() +} + +fn expand_filename_arguments(args: Vec) -> io::Result> { + let mut filenames = vec![]; + for arg in args { + let path = PathBuf::from(arg); + if path.metadata()?.is_dir() { + for entry in path.read_dir()? { + let entry = entry?; + if entry.file_type()?.is_file() { + filenames.push(entry.path()); + } + } + } else { + filenames.push(path); + } + } + Ok(filenames) +} + +fn run(filenames: Vec, single_threaded: bool) -> io::Result<()> { + let output_dir = PathBuf::from("."); + let documents = expand_filename_arguments(filenames)?; + + if single_threaded { + run_single_threaded(documents, output_dir) + } else { + run_pipeline(documents, output_dir) + } +} + +fn main() { + let mut single_threaded = false; + let mut filenames = vec![]; + + { + let mut ap = ArgumentParser::new(); + ap.set_description("Make an inverted index for searching documents."); + ap.refer(&mut single_threaded) + .add_option(&["-1", "--single-threaded"], StoreTrue, + "Do all the work on a single thread."); + ap.refer(&mut filenames) + .add_argument("filenames", Collect, + "Names of files/directories to index. \ + For directories, all .txt files immediately \ + under the directory are indexed."); + ap.parse_args_or_exit(); + } + + match run(filenames, single_threaded) { + Ok(()) => {} + Err(err) => println!("error: {:?}", err.description()) + } +} diff --git a/src/merge.rs b/src/merge.rs new file mode 100644 index 0000000..038dfc5 --- /dev/null +++ b/src/merge.rs @@ -0,0 +1,132 @@ +use std::fs::{self, File}; +use std::io::{self, BufWriter}; +use std::mem; +use std::path::{Path, PathBuf}; + +use tmp::TmpDir; +use read::IndexFileReader; +use write::IndexFileWriter; + +pub struct FileMerge { + output_dir: PathBuf, + tmp_dir: TmpDir, + stacks: Vec> +} + +// How many files to merge at a time, at most. +const NSTREAMS: usize = 8; + +const MERGED_FILENAME: &'static str = "index.dat"; + +impl FileMerge { + pub fn new(output_dir: &Path) -> FileMerge { + FileMerge { + output_dir: output_dir.to_owned(), + tmp_dir: TmpDir::new(output_dir.to_owned()), + stacks: vec![] + } + } + + pub fn add_file(&mut self, mut file: PathBuf) -> io::Result<()> { + let mut level = 0; + loop { + if level == self.stacks.len() { + self.stacks.push(vec![]); + } + self.stacks[level].push(file); + if self.stacks[level].len() < NSTREAMS { + break; + } + let (filename, out) = self.tmp_dir.create()?; + let mut to_merge = vec![]; + mem::swap(&mut self.stacks[level], &mut to_merge); + merge_streams(to_merge, out)?; + file = filename; + level += 1; + } + Ok(()) + } + + pub fn finish(mut self) -> io::Result<()> { + let mut tmp = Vec::with_capacity(NSTREAMS); + for stack in self.stacks { + for file in stack.into_iter().rev() { + tmp.push(file); + if tmp.len() == NSTREAMS { + merge_reversed(&mut tmp, &mut self.tmp_dir)?; + } + } + } + + if tmp.len() > 1 { + merge_reversed(&mut tmp, &mut self.tmp_dir)?; + } + assert!(tmp.len() <= 1); + match tmp.pop() { + Some(last_file) => + fs::rename(last_file, self.output_dir.join(MERGED_FILENAME)), + None => + Err(io::Error::new(io::ErrorKind::Other, + "no documents were parsed or none contained any words")) + } + } +} + +fn merge_streams(files: Vec, out: BufWriter) + -> io::Result<()> +{ + let mut streams: Vec = + files.into_iter() + .map(IndexFileReader::open_and_delete) + .collect::>()?; + + let mut output = IndexFileWriter::new(out)?; + + let mut point: u64 = 0; + let mut count = streams.iter().filter(|s| s.peek().is_some()).count(); + while count > 0 { + let mut term = None; + let mut nbytes = 0; + let mut df = 0; + for s in &streams { + match s.peek() { + None => {} + Some(entry) => { + if term.is_none() || entry.term < *term.as_ref().unwrap() { + term = Some(entry.term.clone()); // XXX LAME clone + nbytes = entry.nbytes; + df = entry.df; + } else if entry.term == *term.as_ref().unwrap() { + nbytes += entry.nbytes; + df += entry.df; + } + } + } + } + let term = term.expect("bug in algorithm!"); + + for s in &mut streams { + if s.is_at(&term) { + s.move_entry_to(&mut output)?; + if s.peek().is_none() { + count -= 1; + } + } + } + output.write_contents_entry(term, df, point, nbytes as u64); + point += nbytes as u64; + } + + assert!(streams.iter().all(|s| s.peek().is_none())); + output.finish() +} + +fn merge_reversed(filenames: &mut Vec, tmp_dir: &mut TmpDir) -> io::Result<()> { + filenames.reverse(); + let (merged_filename, out) = tmp_dir.create()?; + let mut to_merge = Vec::with_capacity(NSTREAMS); + mem::swap(filenames, &mut to_merge); + merge_streams(to_merge, out)?; + filenames.push(merged_filename); + Ok(()) +} diff --git a/src/read.rs b/src/read.rs new file mode 100644 index 0000000..8faebc9 --- /dev/null +++ b/src/read.rs @@ -0,0 +1,115 @@ +use std::fs::{self, File}; +use std::io::prelude::*; +use std::io::{self, BufReader, SeekFrom}; +use std::path::Path; +use byteorder::{LittleEndian, ReadBytesExt}; +use write::IndexFileWriter; + +/// A `IndexFileReader` does a single linear pass over an index file from +/// beginning to end. Needless to say, this is not how an index is normally +/// used! This is only used when merging multiple index files. +pub struct IndexFileReader { + main: BufReader, + contents: BufReader, + next: Option +} + +pub struct Entry { + pub term: String, + pub df: u32, + pub offset: u64, + pub nbytes: u64 +} + +impl IndexFileReader { + pub fn open_and_delete>(filename: P) -> io::Result { + let filename = filename.as_ref(); + let mut main_raw = File::open(filename)?; + + // Read the file header. + let contents_offset = main_raw.read_u64::()?; + println!("opened {}, table of contents starts at {}", filename.display(), contents_offset); + + // Open again so we have two read heads; + // move the contents read head to its starting position. + // Set up buffering. + let mut contents_raw = File::open(filename)?; + contents_raw.seek(SeekFrom::Start(contents_offset))?; + let main = BufReader::new(main_raw); + let mut contents = BufReader::new(contents_raw); + + // We always read ahead one entry, so load the first entry right away. + let first = IndexFileReader::read_entry(&mut contents)?; + + fs::remove_file(filename)?; // YOLO + + Ok(IndexFileReader { + main: main, + contents: contents, + next: first + }) + } + + fn read_entry(f: &mut BufReader) -> io::Result> { + // If the first read here fails with `UnexpectedEof`, + // that's considered a success, with no entry read. + let offset = match f.read_u64::() { + Ok(value) => value, + Err(err) => + if err.kind() == io::ErrorKind::UnexpectedEof { + return Ok(None) + } else { + return Err(err) + } + }; + + let nbytes = f.read_u64::()?; + let df = f.read_u32::()?; + let term_len = f.read_u32::()? as usize; + let mut bytes = Vec::with_capacity(term_len); + bytes.resize(term_len, 0); + f.read_exact(&mut bytes)?; + let term = match String::from_utf8(bytes) { + Ok(s) => s, + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "unicode fail")) + }; + + Ok(Some(Entry { + term: term, + df: df, + offset: offset, + nbytes: nbytes + })) + } + + pub fn peek(&self) -> Option<&Entry> { self.next.as_ref() } + + pub fn is_at(&self, term: &str) -> bool { + match self.next { + Some(ref e) => e.term == term, + None => false + } + } + + /// Copy the current entry to the specified output stream, + /// then read the header for the next entry. + pub fn move_entry_to(&mut self, out: &mut IndexFileWriter) -> io::Result<()> { + // This block limits the scope of borrowing `self.next` (for `e`), + // because after this block is over we'll want to assign to `self.next`. + { + let e = self.next.as_ref().expect("no entry to move"); + if e.nbytes > usize::max_value() as u64 { + // This can only happen on 32-bit platforms. + return Err(io::Error::new(io::ErrorKind::Other, + "computer not big enough to hold index entry")); + } + let mut buf = Vec::with_capacity(e.nbytes as usize); + buf.resize(e.nbytes as usize, 0); + self.main.read_exact(&mut buf)?; + out.write_main(&buf)?; + } + + self.next = Self::read_entry(&mut self.contents)?; + Ok(()) + } +} diff --git a/src/tmp.rs b/src/tmp.rs new file mode 100644 index 0000000..6a44525 --- /dev/null +++ b/src/tmp.rs @@ -0,0 +1,42 @@ +use std::io::{self, BufWriter}; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; + +#[derive(Clone)] +pub struct TmpDir { + dir: PathBuf, + n: usize +} + +impl TmpDir { + pub fn new>(dir: P) -> TmpDir { + TmpDir { + dir: dir.as_ref().to_owned(), + n: 1 + } + } + + pub fn create(&mut self) -> io::Result<(PathBuf, BufWriter)> { + let mut try = 1; + loop { + let filename = self.dir.join(PathBuf::from(format!("tmp{:08x}.dat", self.n))); + self.n += 1; + match fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&filename) + { + Ok(f) => + return Ok((filename, BufWriter::new(f))), + Err(exc) => + if try < 999 && exc.kind() == io::ErrorKind::AlreadyExists { + // keep going + } else { + return Err(exc); + } + } + try += 1; + } + + } +} diff --git a/src/write.rs b/src/write.rs new file mode 100644 index 0000000..100f388 --- /dev/null +++ b/src/write.rs @@ -0,0 +1,73 @@ +use std::fs::File; +use std::io::{self, BufWriter, SeekFrom}; +use std::io::prelude::*; +use std::path::PathBuf; +use index::InMemoryIndex; +use tmp::TmpDir; +use byteorder::{LittleEndian, WriteBytesExt}; + +pub struct IndexFileWriter { + offset: u64, + writer: BufWriter, + contents_buf: Vec +} + +impl IndexFileWriter { + pub fn new(mut f: BufWriter) -> io::Result { + const HEADER_SIZE: u64 = 8; + f.write_u64::(0)?; + Ok(IndexFileWriter { + offset: HEADER_SIZE, + writer: f, + contents_buf: vec![] + }) + } + + pub fn write_main(&mut self, buf: &[u8]) -> io::Result<()> { + self.writer.write_all(buf)?; + self.offset += buf.len() as u64; + Ok(()) + } + + pub fn write_contents_entry(&mut self, term: String, df: u32, offset: u64, nbytes: u64) { + self.contents_buf.write_u64::(offset).unwrap(); + self.contents_buf.write_u64::(nbytes).unwrap(); + self.contents_buf.write_u32::(df).unwrap(); + let bytes = term.bytes(); + self.contents_buf.write_u32::(bytes.len() as u32).unwrap(); + self.contents_buf.extend(bytes); + } + + pub fn finish(mut self) -> io::Result<()> { + let contents_start = self.offset; + self.writer.write_all(&self.contents_buf)?; + println!("{} bytes main, {} bytes total", contents_start, contents_start + self.contents_buf.len() as u64); + self.writer.seek(SeekFrom::Start(0))?; + self.writer.write_u64::(contents_start)?; + Ok(()) + } +} + +pub fn write_index_to_tmp_file(index: InMemoryIndex, tmp_dir: &mut TmpDir) -> io::Result { + let (filename, f) = tmp_dir.create()?; + let mut writer = IndexFileWriter::new(f)?; + + // The merge algorithm requires the entries within each file to be sorted by term. + // Sort before writing anything. + let mut index_as_vec: Vec<_> = index.map.into_iter().collect(); + index_as_vec.sort_by(|&(ref a, _), &(ref b, _)| a.cmp(b)); + + for (term, hits) in index_as_vec { + let df = hits.len() as u32; + let start = writer.offset; + for buffer in hits { + writer.write_main(&buffer)?; + } + let stop = writer.offset; + writer.write_contents_entry(term, df, start, stop - start); + } + + writer.finish()?; + println!("wrote file {:?}", filename); + Ok(filename) +}