package edu.rice.comp322; import edu.rice.hj.api.HjFinishAccumulator; import edu.rice.hj.api.HjOperator; import edu.rice.hj.api.HjProcedure; import edu.rice.hj.api.SuspendableException; import edu.rice.hj.runtime.util.Pair; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicInteger; import static edu.rice.hj.Module0.finish; import static edu.rice.hj.Module0.launchHabaneroApp; import static edu.rice.hj.Module0.newFinishAccumulator; import static edu.rice.hj.Module1.async; /** * Four-way Array sum example. */ public class ArraySumFourWay { /** * Constant ERROR_MSG="Incorrect argument for array size". */ public static final String ERROR_MSG = "Incorrect argument for array size (should be > 0), assuming default n"; /** * Constant DEFAULT_N=10_000_000. */ public static final int DEFAULT_N = 10_000_000; /** * Constant THRESHOLD=50_000. */ public static final int THRESHOLD = 50_000; /** *

main.

* * @param args an array of {@link String} objects. */ public static void main(final String[] args) throws Exception { // Initialization int n = parseLengthArgument(args); final double[] X = initializeArray(n); System.out.println("ArraySumFourWay.main: starting..."); for (int numRun = 0; numRun < 5; numRun++) { System.out.printf(" Run %d\n", numRun); timeExecution(() -> seqArraySum(X, 0, X.length), (p) -> printResults("seqArraySum", p.right, p.left)); timeExecution(() -> parArraySumForkJoin(X, 0, X.length), (p) -> printResults("parArraySumForkJoin", p.right, p.left)); timeExecution(() -> parArraySumHabaneroJava(X, 0, X.length), (p) -> printResults("parArraySumHabaneroJava", p.right, p.left)); } System.out.println("ArraySumFourWay.main: ended."); } protected static int parseLengthArgument(final String[] argv) { int n; if (argv.length != 0) { try { n = Integer.parseInt(argv[0]); if (n <= 0) { // Bad value of n System.out.println(ERROR_MSG); n = DEFAULT_N; } } catch (Throwable e) { System.out.println(ERROR_MSG); n = DEFAULT_N; } } else { // argv.length == 0 n = DEFAULT_N; } return n; } protected static double[] initializeArray(final int n) { final double[] X = new double[n]; final Random myRand = new Random(n); for (int i = 0; i < n; i++) { X[i] = myRand.nextInt(n); if (X[i] == 0.0) { i--; } } return X; } protected static Pair timeExecution( final Callable body, final HjProcedure> callback) throws Exception { final long startTime = System.nanoTime(); final Double result = body.call(); final long endTime = System.nanoTime(); final long execTimeInNanos = endTime - startTime; final Pair resultPair = Pair.factory(result, execTimeInNanos); if (callback != null) { callback.apply(resultPair); } return resultPair; } protected static void printResults(final String name, final long timeInNanos, final double sum) { System.out.printf(" %s completed in %8.3f milliseconds, with sum = %10.6f \n", name, timeInNanos / 1e6, sum); } /** *

seqArraySum.

* * @param xArray an array of double. * @return a double. */ protected static double seqArraySum(final double[] xArray, final int start, final int end) { double sum = 0; // Compute sum of array elements doing some work (sin * cos) for (int i = start; i < end; i++) { sum += (Math.sin(xArray[i]) * Math.cos(xArray[i])); } return sum; } /** *

parArraySumHabaneroJava.

*

* NOTE: you will need to add a "throws SuspendableException" clause to this method signature when it contains a * finish or any other blocking API. *

* * @param xArray an array of double. * @return a double. */ protected static void parArraySumHabaneroJava( final double[] xArray, final int start, final int end, final HjFinishAccumulator acc) throws SuspendableException { final int fragmentSize = end - start; if (fragmentSize < THRESHOLD) { // sequential threshold cutoff final double sum = seqArraySum(xArray, start, end); acc.put(sum); } else { final int numSplits = 4; final int segmentSize = fragmentSize / 4; // a 4-way split with 4 asyncs to do work in parallel. for (int nn = 1; nn <= numSplits; nn++) { final int startIndex = start + ((nn - 1) * segmentSize); final int endIndex = (nn == numSplits) ? end : (startIndex + segmentSize); async(() -> { parArraySumHabaneroJava(xArray, startIndex, endIndex, acc); }); } } } /** *

parArraySumHabaneroJava.

*

* NOTE: you will need to add a "throws SuspendableException" clause to this method signature when it contains a * finish or any other blocking API. *

* * @param xArray an array of double. * @return a double. */ protected static double parArraySumHabaneroJava( final double[] xArray, final int start, final int end) { final double[] sum = {0.0}; // HjSystemProperty.showRuntimeStats.set(true); launchHabaneroApp(() -> { final HjFinishAccumulator acc = newFinishAccumulator(HjOperator.SUM, Double.class); finish(acc, () -> { parArraySumHabaneroJava(xArray, start, end, acc); }); sum[0] = acc.get().doubleValue(); }); return sum[0]; } protected static class ArraySumForkJoinTask extends RecursiveTask { private static AtomicInteger taskCounter = new AtomicInteger(0); private final double[] xArray; private final int start; private final int end; public ArraySumForkJoinTask(double[] xArray, int start, int end) { this.xArray = xArray; this.start = start; this.end = end; taskCounter.incrementAndGet(); } @Override protected Double compute() { final int fragmentSize = end - start; if (fragmentSize < THRESHOLD) { // sequential threshold cutoff return seqArraySum(xArray, start, end); } else { final int numSplits = 4; final int segmentSize = fragmentSize / 4; final List> tasks = new ArrayList<>(numSplits); double sum = 0; // a 4-way split with 4 asyncs to do work in parallel. for (int nn = 1; nn <= numSplits; nn++) { final int startIndex = start + ((nn - 1) * segmentSize); final int endIndex = (nn == numSplits) ? end : (startIndex + segmentSize); final RecursiveTask task = new ArraySumForkJoinTask(xArray, startIndex, endIndex); tasks.add(task); } invokeAll(tasks); for (final RecursiveTask task : tasks) { sum += task.join(); } return sum; } } } /** *

parArraySumHabaneroJava.

*

* NOTE: you will need to add a "throws SuspendableException" clause to this method signature when it contains a * finish or any other blocking API. *

* * @param xArray an array of double. * @return a double. */ protected static double parArraySumForkJoin(final double[] xArray, final int start, final int end) { final ForkJoinPool commonPool = ForkJoinPool.commonPool(); final ArraySumForkJoinTask task = new ArraySumForkJoinTask(xArray, start, end); final Double result = commonPool.invoke(task); // System.out.println("Num tasks: " + ArraySumForkJoinTask.taskCounter.get()); // System.out.println("Thread count: " + commonPool.getActiveThreadCount()); // System.out.println("Num steals: " + commonPool.getStealCount()); // System.out.println("Pool size: " + commonPool.getPoolSize()); return result; } }