package edu.rice.hj.example.comp322; import edu.rice.hj.runtime.mapreduce.MapReduce; import edu.rice.hj.runtime.mapreduce.MapTask; import java.io.File; import java.io.FileReader; import java.io.IOException; public class WordCount extends MapReduce { public static void main(final String[] args) { if (args.length < 3) { System.out.println("Usage: run [report?] "); return; } final boolean report = (args.length > 3); final String fileName = args[0]; final int numMapTasks = Integer.parseInt(args[1]); final int numReduceTasks = Integer.parseInt(args[2]); final WordCount wc = new WordCount(fileName, numMapTasks, numReduceTasks); final long startTime = System.nanoTime(); // Run instance of MapReduce wc.run(); final long execTime = System.nanoTime() - startTime; System.out.println("Total time taken by MapReduce job = " + (float) execTime / 1e9 + " seconds"); System.out.println("Number of unique words counted = " + wc.getKeys().size()); if (report) { wc.report(); } } private final int _size; private final char[] text; private final int _chunkSize; public WordCount(final String fileName, final int numMapTasks, final int numReduceTasks) { // Specify number of tasks to execute map and reduce tasks respectively super(numMapTasks, numReduceTasks); final File _file = new File(fileName); _size = (int) _file.length(); _chunkSize = (_size + numMapTasks - 1) / numMapTasks; // Allocate file size to avoid index-out-of-bounds complications in MapReduceBase text = new char[_size]; try { final FileReader _reader = new FileReader(fileName); _reader.read(text, 0, _size); // Read entire file into text array } catch (IOException e) { e.printStackTrace(); } } private int _startPos = 0; private int _endPos = -1; /** * Each call to splitter() produces a new map task, until the last call which returns null */ public MapTask splitter(final int taskId) { if (_endPos >= _size - 1) { return null; } // Heuristic: divide up file into _numMapTasks chunks // NOTE: we overlook the issue that this heuristic may split a word in the middle _endPos = Math.min(_startPos + _chunkSize - 1, _size - 1); final WordCountMapTask task = new WordCountMapTask(taskId, _startPos, _endPos); _startPos = _endPos + 1; return task; } public void report() { System.out.println("Reporting..."); System.out.println("First 100 entries: "); int index = 0; final Key[] keys = this.getKeys().toArray(new Key[0]); for (int i = 0; i < keys.length; i++) { System.out.println(" " + keys[i] + " : " + this.query(keys[i])); index++; if (index > 100) { break; } } } /** * Define an instance of MapTask that overrides the map() function, * and makes multiple calls to emitIntermediate() to emit intermediate key-value pairs. */ private class WordCountMapTask extends MapTask { private static final int NOT_IN_WORD = 0; private static final int IN_WORD = 1; private int _startPos; private int _endPos; WordCountMapTask(final int id, final int startPos, final int endPos) { super(WordCount.this, id); _startPos = startPos; _endPos = endPos; } public void map() { int state = NOT_IN_WORD; int curr_start = -1; int p = _startPos; while (p <= _endPos && Character.isLetter(text[p])) { p++; } for (; p <= _endPos; p++) { final char ch = Character.toUpperCase(text[p]); if (state == IN_WORD) { text[p] = ch; if (Character.isLetter(ch) == false && ch != '\'') { emitIntermediate(new Key(text, curr_start, p - 1), new Integer(1)); state = NOT_IN_WORD; } } else { if (Character.isLetter(ch)) { text[p] = ch; state = IN_WORD; curr_start = p; } } } if (state == IN_WORD) { while ((Character.isLetter(text[p]) || Character.toUpperCase(text[p]) == '\'')) { final char ch = Character.toUpperCase(text[p]); text[p] = ch; p++; } emitIntermediate(new Key(text, curr_start, p - 1), new Integer(1)); } } public String toString() { return "From " + _startPos + " to " + _endPos; } } // class WordCountMapTask /** * The reduce function. */ public Integer reduce(final Integer v1, final Integer v2) { // cast input arguments v1 and v2 to Integer, // compute their sum, and // return an Integer object containing the sum return v1 + v2; } } /** * A Key instance consists of two ints, startPos and endPos */ final class Key { public final char[] text; public final int startPos; public final int endPos; private int hash = 0; public String toString() { String s = ""; for (int i = startPos; i <= endPos; i++) { s = s + text[i]; } return s + "(" + startPos + "-" + endPos + ")"; } Key(final char[] text, final int startPos, final int endPos) { this.text = text; this.startPos = startPos; this.endPos = endPos; } public int hashCode() { if (hash == 0) { hash = 5381; for (int i = startPos; i <= endPos; i++) { hash = ((hash << 5) + hash) + (text[i]); /* hash * 33 + c */ } } return hash; } public boolean equals(final Object o) { if (!(o instanceof Key)) { return false; } final Key k = (Key) o; if (k.endPos - k.startPos != endPos - startPos) { return false; } for (int i = startPos; i <= endPos; i++) { if (text[i] != text[k.startPos - startPos + i]) { return false; } } return true; } }