From 87401acda39feffd5f7dd251ab82d1962f2bcee6 Mon Sep 17 00:00:00 2001 From: jrtechs Date: Fri, 7 Aug 2020 20:16:17 -0400 Subject: [PATCH] Code for blog post. --- .gitignore | 2 + pom.xml | 92 ++++++++++++ src/main/java/net/jrtechs/DoMaths.java | 22 +++ src/main/java/net/jrtechs/DoNothing.java | 14 ++ src/main/java/net/jrtechs/GenericTester.java | 30 ++++ src/main/java/net/jrtechs/Manager.java | 66 +++++++++ .../java/net/jrtechs/ParallelExecutor.java | 10 ++ .../net/jrtechs/ParallelStreamsExecutor.java | 16 ++ .../net/jrtechs/PythonGraphingConversion.java | 42 ++++++ src/main/java/net/jrtechs/Result.java | 21 +++ src/main/java/net/jrtechs/RunThreads.java | 27 ++++ src/main/java/net/jrtechs/SingleThread.java | 17 +++ src/main/java/net/jrtechs/SleepWork.java | 19 +++ src/main/java/net/jrtechs/Test.java | 140 ++++++++++++++++++ .../java/net/jrtechs/ThreadPoolExecutor.java | 45 ++++++ src/main/java/net/jrtechs/Work.java | 6 + src/main/java/net/jrtechs/WorkGenerator.java | 11 ++ 17 files changed, 580 insertions(+) create mode 100644 .gitignore create mode 100644 pom.xml create mode 100644 src/main/java/net/jrtechs/DoMaths.java create mode 100644 src/main/java/net/jrtechs/DoNothing.java create mode 100644 src/main/java/net/jrtechs/GenericTester.java create mode 100644 src/main/java/net/jrtechs/Manager.java create mode 100644 src/main/java/net/jrtechs/ParallelExecutor.java create mode 100644 src/main/java/net/jrtechs/ParallelStreamsExecutor.java create mode 100644 src/main/java/net/jrtechs/PythonGraphingConversion.java create mode 100644 src/main/java/net/jrtechs/Result.java create mode 100644 src/main/java/net/jrtechs/RunThreads.java create mode 100644 src/main/java/net/jrtechs/SingleThread.java create mode 100644 src/main/java/net/jrtechs/SleepWork.java create mode 100644 src/main/java/net/jrtechs/Test.java create mode 100644 src/main/java/net/jrtechs/ThreadPoolExecutor.java create mode 100644 src/main/java/net/jrtechs/Work.java create mode 100644 src/main/java/net/jrtechs/WorkGenerator.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..744289d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +# Project exclude paths +/target/ \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d997157 --- /dev/null +++ b/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + net.jrtechs + parallel-java-performance-overview + 1.0-SNAPSHOT + + parallel-java-performance-overview + https://jrtechs.net/java/parallel-java-performance-overview + + + UTF-8 + 1.8 + 1.8 + + + + + junit + junit + 4.11 + test + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 7 + 7 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + diff --git a/src/main/java/net/jrtechs/DoMaths.java b/src/main/java/net/jrtechs/DoMaths.java new file mode 100644 index 0000000..7e95efa --- /dev/null +++ b/src/main/java/net/jrtechs/DoMaths.java @@ -0,0 +1,22 @@ +package net.jrtechs; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + +public class DoMaths extends WorkGenerator +{ + @Override + Work generateWork(Double param) { + return new Work() { + @Override + Double runTask() + { + return IntStream.range(0, (int)Math.round(param)) + .boxed() + .map(i -> Math.sin(i * ThreadLocalRandom.current().nextDouble())) + .mapToDouble(java.lang.Double::doubleValue) + .sum(); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/net/jrtechs/DoNothing.java b/src/main/java/net/jrtechs/DoNothing.java new file mode 100644 index 0000000..c0dc994 --- /dev/null +++ b/src/main/java/net/jrtechs/DoNothing.java @@ -0,0 +1,14 @@ +package net.jrtechs; + +public class DoNothing extends WorkGenerator +{ + @Override + Work generateWork(E param) { + return new Work() { + @Override + E runTask() { + return param; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/net/jrtechs/GenericTester.java b/src/main/java/net/jrtechs/GenericTester.java new file mode 100644 index 0000000..39737c9 --- /dev/null +++ b/src/main/java/net/jrtechs/GenericTester.java @@ -0,0 +1,30 @@ +package net.jrtechs; + +import java.util.Vector; + +public class GenericTester +{ + public long timeTrialMS(ParallelExecutor executor, Vector> tasks) + { + long start = System.nanoTime(); + executor.runTasks(tasks); + long finish = System.nanoTime(); + return (finish-start)/1000000; + } + + public Result testAll(Vector> tasks) + { + ParallelExecutor streams = new ParallelStreamsExecutor<>(); + ParallelExecutor threads = new RunThreads<>(); + ParallelExecutor manager = new Manager<>(8); + ParallelExecutor single = new SingleThread<>(); + ParallelExecutor pool = new ThreadPoolExecutor<>(); + Result res = new Result(); + res.streams = timeTrialMS(streams, tasks); + res.manager = timeTrialMS(manager, tasks); + res.threads = timeTrialMS(threads, tasks); + res.pool = timeTrialMS(pool, tasks); + res.singleThread = timeTrialMS(single, tasks); + return res; + } +} diff --git a/src/main/java/net/jrtechs/Manager.java b/src/main/java/net/jrtechs/Manager.java new file mode 100644 index 0000000..9b2aa41 --- /dev/null +++ b/src/main/java/net/jrtechs/Manager.java @@ -0,0 +1,66 @@ +package net.jrtechs; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +public class Manager extends ParallelExecutor +{ + /** Number of threads to use at once */ + private final int threadCount; + + public Manager(int threadCount) + { + this.threadCount = threadCount; + } + + /** + * This is the fun method. + * + * This will run all of the tasks in parallel using the + * desired amount of threads until all of the jobs are + * complete. + * @return + * @param tasks + */ + public List runTasks(Vector> tasks) + { + List results = new Vector<>(); + Queue taskQueue = new LinkedList<>(); + taskQueue.addAll(IntStream.range(0, tasks.size()) + .boxed().collect(Collectors.toList())); + int desiredThreads = Math.min(threadCount, tasks.size()); + Thread[] runners = new Thread[desiredThreads]; + for(int i = 0; i < desiredThreads; i++) + { + runners[i] = new Thread(()-> + { + Work t; + while(true) + { + Integer nextTask; + synchronized (taskQueue) + { + nextTask = taskQueue.poll(); + } + if(nextTask == null) + return; + results.add(tasks.get(nextTask).runTask()); + } + }); + runners[i].start(); + } + for(Thread t: runners) + { + try + { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return results; + } +} diff --git a/src/main/java/net/jrtechs/ParallelExecutor.java b/src/main/java/net/jrtechs/ParallelExecutor.java new file mode 100644 index 0000000..6c03ec3 --- /dev/null +++ b/src/main/java/net/jrtechs/ParallelExecutor.java @@ -0,0 +1,10 @@ +package net.jrtechs; + +import java.util.Collection; +import java.util.List; +import java.util.Vector; + +public abstract class ParallelExecutor +{ + public abstract List runTasks(Vector> tasks); +} diff --git a/src/main/java/net/jrtechs/ParallelStreamsExecutor.java b/src/main/java/net/jrtechs/ParallelStreamsExecutor.java new file mode 100644 index 0000000..bc868f1 --- /dev/null +++ b/src/main/java/net/jrtechs/ParallelStreamsExecutor.java @@ -0,0 +1,16 @@ +package net.jrtechs; + +import java.util.List; +import java.util.Vector; +import java.util.stream.Collectors; + +public class ParallelStreamsExecutor extends ParallelExecutor +{ + @Override + public List runTasks(Vector> tasks) + { + return tasks.parallelStream() + .map(Work::runTask) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/net/jrtechs/PythonGraphingConversion.java b/src/main/java/net/jrtechs/PythonGraphingConversion.java new file mode 100644 index 0000000..fe75607 --- /dev/null +++ b/src/main/java/net/jrtechs/PythonGraphingConversion.java @@ -0,0 +1,42 @@ +package net.jrtechs; + +import java.util.ArrayList; +import java.util.List; + +public class PythonGraphingConversion +{ + List single; + List threaded; + List manager; + List streams; + List size; + + public PythonGraphingConversion() + { + single = new ArrayList<>(); + threaded = new ArrayList<>(); + manager = new ArrayList<>(); + streams = new ArrayList<>(); + size = new ArrayList<>(); + } + + public void addPoint(Result res, int i) + { + size.add(i); + single.add(res.singleThread); + threaded.add(res.threads); + manager.add(res.manager); + streams.add(res.streams); + } + + + public void printPythonCode(String title) + { + System.out.println(String.format("single = %s", this.single.toString())); + System.out.println(String.format("threads = %s", this.threaded.toString())); + System.out.println(String.format("manager = %s", this.manager.toString())); + System.out.println(String.format("streams = %s", this.streams.toString())); + System.out.println(String.format("sizes = %s", this.size.toString())); + System.out.println(String.format("plot_result(single, threads, manager, streams, sizes, title='%s')", title)); + } +} diff --git a/src/main/java/net/jrtechs/Result.java b/src/main/java/net/jrtechs/Result.java new file mode 100644 index 0000000..fd8aff2 --- /dev/null +++ b/src/main/java/net/jrtechs/Result.java @@ -0,0 +1,21 @@ +package net.jrtechs; + +public class Result +{ + long streams; + long threads; + long manager; + long singleThread; + long pool; + + @Override + public String toString() { + return "Result{" + + "streams=" + streams + + ", threads=" + threads + + ", manager=" + manager + + ", singleThread=" + singleThread + + ", pool=" + pool + + '}'; + } +} diff --git a/src/main/java/net/jrtechs/RunThreads.java b/src/main/java/net/jrtechs/RunThreads.java new file mode 100644 index 0000000..24af82e --- /dev/null +++ b/src/main/java/net/jrtechs/RunThreads.java @@ -0,0 +1,27 @@ +package net.jrtechs; + +import java.util.List; +import java.util.Vector; +import java.util.stream.Collectors; + +public class RunThreads extends ParallelExecutor +{ + @Override + public List runTasks(Vector> tasks) + { + List results = new Vector<>(); + List threads = tasks.stream() + .map(task -> + new Thread(() -> results.add(task.runTask()))) + .collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t-> { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + return results; + } +} diff --git a/src/main/java/net/jrtechs/SingleThread.java b/src/main/java/net/jrtechs/SingleThread.java new file mode 100644 index 0000000..5cdc14a --- /dev/null +++ b/src/main/java/net/jrtechs/SingleThread.java @@ -0,0 +1,17 @@ +package net.jrtechs; + +import java.util.LinkedList; +import java.util.List; +import java.util.Vector; +import java.util.stream.Collectors; + +public class SingleThread extends ParallelExecutor +{ + @Override + public List runTasks(Vector> tasks) + { + return tasks.stream() + .map(Work::runTask) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/net/jrtechs/SleepWork.java b/src/main/java/net/jrtechs/SleepWork.java new file mode 100644 index 0000000..932cfbf --- /dev/null +++ b/src/main/java/net/jrtechs/SleepWork.java @@ -0,0 +1,19 @@ +package net.jrtechs; + +public class SleepWork extends WorkGenerator +{ + @Override + Work generateWork(E param) { + return new Work() { + @Override + E runTask() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return param; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/net/jrtechs/Test.java b/src/main/java/net/jrtechs/Test.java new file mode 100644 index 0000000..0e27f3d --- /dev/null +++ b/src/main/java/net/jrtechs/Test.java @@ -0,0 +1,140 @@ +package net.jrtechs; + +import java.sql.SQLOutput; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class Test +{ + + public static void runDoNothingTest(int max, int incrementer) + { + WorkGenerator generator = new DoNothing<>(); + PythonGraphingConversion result = new PythonGraphingConversion(); + + Vector> workAll = new Vector<>(); + workAll.addAll( + IntStream.range(0, max) + .boxed() + .map(i -> generator.generateWork(i)) + .collect(Collectors.toList()) + ); + GenericTester genObjectTester = new GenericTester<>(); + for(int i = 1; i <= max; i+= incrementer) + { + Vector> work = new Vector<>(workAll.subList(0, i)); + Result res = genObjectTester.testAll(work); + result.addPoint(res, i); + } + result.printPythonCode("Operational Overhead"); + } + + + public static void sleepTest(int max, int incrementer) + { + WorkGenerator generator = new SleepWork<>(); + PythonGraphingConversion result = new PythonGraphingConversion(); + + Vector> workAll = new Vector<>(); + workAll.addAll( + IntStream.range(0, max) + .boxed() + .map(i -> generator.generateWork(i)) + .collect(Collectors.toList()) + ); + GenericTester genObjectTester = new GenericTester<>(); + for(int i = 1; i <= max; i+= incrementer) + { + Vector> work = new Vector<>(workAll.subList(0, i)); + Result res = genObjectTester.testAll(work); + result.addPoint(res, i); + } + result.printPythonCode("Sleeping Tasks"); + } + + public static void arithmeticWork(int max, int incrementer) + { + WorkGenerator generator = new DoMaths<>(); + PythonGraphingConversion result = new PythonGraphingConversion(); + + Vector> workAll = new Vector<>(); + workAll.addAll( + IntStream.range(0, max) + .boxed() + .map(i -> generator.generateWork(i*1.0)) + .collect(Collectors.toList()) + ); + GenericTester doubleGenericTester = new GenericTester<>(); + for(int i = 1; i <= max; i+= incrementer) + { + Vector> work = new Vector<>(workAll.subList(0, i)); + Result res = doubleGenericTester.testAll(work); + result.addPoint(res, i); + } + result.printPythonCode("Complex Maths"); + } + + public static void main(String[] arguments) + { + //sleepTest(50, 5); + arithmeticWork(10000, 20); +// runDoNothingTest(10000, 100); +// Vector> work = new Vector<>(); +// work.addAll( +// IntStream.range(0, 100000).boxed() +// .map(i -> new Work() { +// @Override +// Object runTask() { +// //System.out.println("running task"); +// return i; +// } +// } +// ).collect(Collectors.toList()) +// ); +// System.out.println(testAll(work)); + +// +// Vector> work = new Vector<>(); +// work.addAll( +// IntStream.range(0, 10000).boxed() +// .map(i -> new Work() { +// @Override +// Object runTask() { +// for(int z = 0; z < 10000; z++) +// { +// Object o = Math.sin(z * ThreadLocalRandom.current().nextDouble()); +// } +// return ThreadLocalRandom.current().nextDouble() * Math.PI; +// //return i; +// } +// } +// ).collect(Collectors.toList()) +// ); +// GenericTester genericTester = new GenericTester<>(); +// System.out.println(genericTester.testAll(work)); + + + +// Vector> work = new Vector<>(); +// work.addAll( +// IntStream.range(0, 10).boxed() +// .map(i -> new Work() { +// @Override +// Object runTask() { +// try { +// Thread.sleep(500); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// return ThreadLocalRandom.current().nextDouble() * Math.PI; +// //return i; +// } +// } +// ).collect(Collectors.toList()) +// ); +// System.out.println(testAllThree(work)); + } +} diff --git a/src/main/java/net/jrtechs/ThreadPoolExecutor.java b/src/main/java/net/jrtechs/ThreadPoolExecutor.java new file mode 100644 index 0000000..ca945de --- /dev/null +++ b/src/main/java/net/jrtechs/ThreadPoolExecutor.java @@ -0,0 +1,45 @@ +package net.jrtechs; + +import java.util.ArrayList; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +public class ThreadPoolExecutor extends ParallelExecutor +{ + @Override + public List runTasks(Vector> tasks) + { + ExecutorService executor = Executors.newCachedThreadPool(); + List> callables = new ArrayList>(); + for(Work work: tasks) + { + Callable c = new Callable() { + @Override + public E call() throws Exception { + return work.runTask(); + } + }; + callables.add(c); + } + List results = new ArrayList<>(); + try + { + List> futures = executor.invokeAll(callables); + for(Future future: futures) + { + try { + results.add(future.get()); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + return results; + } +} diff --git a/src/main/java/net/jrtechs/Work.java b/src/main/java/net/jrtechs/Work.java new file mode 100644 index 0000000..14dfd98 --- /dev/null +++ b/src/main/java/net/jrtechs/Work.java @@ -0,0 +1,6 @@ +package net.jrtechs; + +public abstract class Work +{ + abstract E runTask(); +} diff --git a/src/main/java/net/jrtechs/WorkGenerator.java b/src/main/java/net/jrtechs/WorkGenerator.java new file mode 100644 index 0000000..1da1643 --- /dev/null +++ b/src/main/java/net/jrtechs/WorkGenerator.java @@ -0,0 +1,11 @@ +package net.jrtechs; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + + +public abstract class WorkGenerator +{ + abstract Work generateWork(E param); + +}