Browse Source

Code for blog post.

master
jrtechs 4 years ago
commit
87401acda3
17 changed files with 580 additions and 0 deletions
  1. +2
    -0
      .gitignore
  2. +92
    -0
      pom.xml
  3. +22
    -0
      src/main/java/net/jrtechs/DoMaths.java
  4. +14
    -0
      src/main/java/net/jrtechs/DoNothing.java
  5. +30
    -0
      src/main/java/net/jrtechs/GenericTester.java
  6. +66
    -0
      src/main/java/net/jrtechs/Manager.java
  7. +10
    -0
      src/main/java/net/jrtechs/ParallelExecutor.java
  8. +16
    -0
      src/main/java/net/jrtechs/ParallelStreamsExecutor.java
  9. +42
    -0
      src/main/java/net/jrtechs/PythonGraphingConversion.java
  10. +21
    -0
      src/main/java/net/jrtechs/Result.java
  11. +27
    -0
      src/main/java/net/jrtechs/RunThreads.java
  12. +17
    -0
      src/main/java/net/jrtechs/SingleThread.java
  13. +19
    -0
      src/main/java/net/jrtechs/SleepWork.java
  14. +140
    -0
      src/main/java/net/jrtechs/Test.java
  15. +45
    -0
      src/main/java/net/jrtechs/ThreadPoolExecutor.java
  16. +6
    -0
      src/main/java/net/jrtechs/Work.java
  17. +11
    -0
      src/main/java/net/jrtechs/WorkGenerator.java

+ 2
- 0
.gitignore View File

@ -0,0 +1,2 @@
# Project exclude paths
/target/

+ 92
- 0
pom.xml View File

@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.jrtechs</groupId>
<artifactId>parallel-java-performance-overview</artifactId>
<version>1.0-SNAPSHOT</version>
<name>parallel-java-performance-overview</name>
<url>https://jrtechs.net/java/parallel-java-performance-overview</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

+ 22
- 0
src/main/java/net/jrtechs/DoMaths.java View File

@ -0,0 +1,22 @@
package net.jrtechs;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
public class DoMaths<E> extends WorkGenerator<Double>
{
@Override
Work<Double> generateWork(Double param) {
return new Work<Double>() {
@Override
Double runTask()
{
return IntStream.range(0, (int)Math.round(param))
.boxed()
.map(i -> Math.sin(i * ThreadLocalRandom.current().nextDouble()))
.mapToDouble(java.lang.Double::doubleValue)
.sum();
}
};
}
}

+ 14
- 0
src/main/java/net/jrtechs/DoNothing.java View File

@ -0,0 +1,14 @@
package net.jrtechs;
public class DoNothing<E> extends WorkGenerator<E>
{
@Override
Work<E> generateWork(E param) {
return new Work<E>() {
@Override
E runTask() {
return param;
}
};
}
}

+ 30
- 0
src/main/java/net/jrtechs/GenericTester.java View File

@ -0,0 +1,30 @@
package net.jrtechs;
import java.util.Vector;
public class GenericTester<E>
{
public long timeTrialMS(ParallelExecutor<E> executor, Vector<Work<E>> tasks)
{
long start = System.nanoTime();
executor.runTasks(tasks);
long finish = System.nanoTime();
return (finish-start)/1000000;
}
public Result testAll(Vector<Work<E>> tasks)
{
ParallelExecutor<E> streams = new ParallelStreamsExecutor<>();
ParallelExecutor<E> threads = new RunThreads<>();
ParallelExecutor<E> manager = new Manager<>(8);
ParallelExecutor<E> single = new SingleThread<>();
ParallelExecutor<E> pool = new ThreadPoolExecutor<>();
Result res = new Result();
res.streams = timeTrialMS(streams, tasks);
res.manager = timeTrialMS(manager, tasks);
res.threads = timeTrialMS(threads, tasks);
res.pool = timeTrialMS(pool, tasks);
res.singleThread = timeTrialMS(single, tasks);
return res;
}
}

+ 66
- 0
src/main/java/net/jrtechs/Manager.java View File

@ -0,0 +1,66 @@
package net.jrtechs;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Manager<E> extends ParallelExecutor<E>
{
/** Number of threads to use at once */
private final int threadCount;
public Manager(int threadCount)
{
this.threadCount = threadCount;
}
/**
* This is the fun method.
*
* This will run all of the tasks in parallel using the
* desired amount of threads until all of the jobs are
* complete.
* @return
* @param tasks
*/
public List<E> runTasks(Vector<Work<E>> tasks)
{
List<E> results = new Vector<>();
Queue<Integer> taskQueue = new LinkedList<>();
taskQueue.addAll(IntStream.range(0, tasks.size())
.boxed().collect(Collectors.toList()));
int desiredThreads = Math.min(threadCount, tasks.size());
Thread[] runners = new Thread[desiredThreads];
for(int i = 0; i < desiredThreads; i++)
{
runners[i] = new Thread(()->
{
Work<E> t;
while(true)
{
Integer nextTask;
synchronized (taskQueue)
{
nextTask = taskQueue.poll();
}
if(nextTask == null)
return;
results.add(tasks.get(nextTask).runTask());
}
});
runners[i].start();
}
for(Thread t: runners)
{
try
{
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return results;
}
}

+ 10
- 0
src/main/java/net/jrtechs/ParallelExecutor.java View File

@ -0,0 +1,10 @@
package net.jrtechs;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
public abstract class ParallelExecutor<E>
{
public abstract List<E> runTasks(Vector<Work<E>> tasks);
}

+ 16
- 0
src/main/java/net/jrtechs/ParallelStreamsExecutor.java View File

@ -0,0 +1,16 @@
package net.jrtechs;
import java.util.List;
import java.util.Vector;
import java.util.stream.Collectors;
public class ParallelStreamsExecutor<E> extends ParallelExecutor<E>
{
@Override
public List<E> runTasks(Vector<Work<E>> tasks)
{
return tasks.parallelStream()
.map(Work::runTask)
.collect(Collectors.toList());
}
}

+ 42
- 0
src/main/java/net/jrtechs/PythonGraphingConversion.java View File

@ -0,0 +1,42 @@
package net.jrtechs;
import java.util.ArrayList;
import java.util.List;
public class PythonGraphingConversion
{
List<Long> single;
List<Long> threaded;
List<Long> manager;
List<Long> streams;
List<Integer> size;
public PythonGraphingConversion()
{
single = new ArrayList<>();
threaded = new ArrayList<>();
manager = new ArrayList<>();
streams = new ArrayList<>();
size = new ArrayList<>();
}
public void addPoint(Result res, int i)
{
size.add(i);
single.add(res.singleThread);
threaded.add(res.threads);
manager.add(res.manager);
streams.add(res.streams);
}
public void printPythonCode(String title)
{
System.out.println(String.format("single = %s", this.single.toString()));
System.out.println(String.format("threads = %s", this.threaded.toString()));
System.out.println(String.format("manager = %s", this.manager.toString()));
System.out.println(String.format("streams = %s", this.streams.toString()));
System.out.println(String.format("sizes = %s", this.size.toString()));
System.out.println(String.format("plot_result(single, threads, manager, streams, sizes, title='%s')", title));
}
}

+ 21
- 0
src/main/java/net/jrtechs/Result.java View File

@ -0,0 +1,21 @@
package net.jrtechs;
public class Result
{
long streams;
long threads;
long manager;
long singleThread;
long pool;
@Override
public String toString() {
return "Result{" +
"streams=" + streams +
", threads=" + threads +
", manager=" + manager +
", singleThread=" + singleThread +
", pool=" + pool +
'}';
}
}

+ 27
- 0
src/main/java/net/jrtechs/RunThreads.java View File

@ -0,0 +1,27 @@
package net.jrtechs;
import java.util.List;
import java.util.Vector;
import java.util.stream.Collectors;
public class RunThreads<E> extends ParallelExecutor<E>
{
@Override
public List<E> runTasks(Vector<Work<E>> tasks)
{
List<E> results = new Vector<>();
List<Thread> threads = tasks.stream()
.map(task ->
new Thread(() -> results.add(task.runTask())))
.collect(Collectors.toList());
threads.forEach(Thread::start);
threads.forEach(t-> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return results;
}
}

+ 17
- 0
src/main/java/net/jrtechs/SingleThread.java View File

@ -0,0 +1,17 @@
package net.jrtechs;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.stream.Collectors;
public class SingleThread<E> extends ParallelExecutor<E>
{
@Override
public List<E> runTasks(Vector<Work<E>> tasks)
{
return tasks.stream()
.map(Work::runTask)
.collect(Collectors.toList());
}
}

+ 19
- 0
src/main/java/net/jrtechs/SleepWork.java View File

@ -0,0 +1,19 @@
package net.jrtechs;
public class SleepWork<E> extends WorkGenerator<E>
{
@Override
Work<E> generateWork(E param) {
return new Work<E>() {
@Override
E runTask() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return param;
}
};
}
}

+ 140
- 0
src/main/java/net/jrtechs/Test.java View File

@ -0,0 +1,140 @@
package net.jrtechs;
import java.sql.SQLOutput;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Test
{
public static void runDoNothingTest(int max, int incrementer)
{
WorkGenerator<Object> generator = new DoNothing<>();
PythonGraphingConversion result = new PythonGraphingConversion();
Vector<Work<Object>> workAll = new Vector<>();
workAll.addAll(
IntStream.range(0, max)
.boxed()
.map(i -> generator.generateWork(i))
.collect(Collectors.toList())
);
GenericTester<Object> genObjectTester = new GenericTester<>();
for(int i = 1; i <= max; i+= incrementer)
{
Vector<Work<Object>> work = new Vector<>(workAll.subList(0, i));
Result res = genObjectTester.testAll(work);
result.addPoint(res, i);
}
result.printPythonCode("Operational Overhead");
}
public static void sleepTest(int max, int incrementer)
{
WorkGenerator<Object> generator = new SleepWork<>();
PythonGraphingConversion result = new PythonGraphingConversion();
Vector<Work<Object>> workAll = new Vector<>();
workAll.addAll(
IntStream.range(0, max)
.boxed()
.map(i -> generator.generateWork(i))
.collect(Collectors.toList())
);
GenericTester<Object> genObjectTester = new GenericTester<>();
for(int i = 1; i <= max; i+= incrementer)
{
Vector<Work<Object>> work = new Vector<>(workAll.subList(0, i));
Result res = genObjectTester.testAll(work);
result.addPoint(res, i);
}
result.printPythonCode("Sleeping Tasks");
}
public static void arithmeticWork(int max, int incrementer)
{
WorkGenerator<Double> generator = new DoMaths<>();
PythonGraphingConversion result = new PythonGraphingConversion();
Vector<Work<Double>> workAll = new Vector<>();
workAll.addAll(
IntStream.range(0, max)
.boxed()
.map(i -> generator.generateWork(i*1.0))
.collect(Collectors.toList())
);
GenericTester<Double> doubleGenericTester = new GenericTester<>();
for(int i = 1; i <= max; i+= incrementer)
{
Vector<Work<Double>> work = new Vector<>(workAll.subList(0, i));
Result res = doubleGenericTester.testAll(work);
result.addPoint(res, i);
}
result.printPythonCode("Complex Maths");
}
public static void main(String[] arguments)
{
//sleepTest(50, 5);
arithmeticWork(10000, 20);
// runDoNothingTest(10000, 100);
// Vector<Work<Object>> work = new Vector<>();
// work.addAll(
// IntStream.range(0, 100000).boxed()
// .map(i -> new Work<Object>() {
// @Override
// Object runTask() {
// //System.out.println("running task");
// return i;
// }
// }
// ).collect(Collectors.toList())
// );
// System.out.println(testAll(work));
//
// Vector<Work<Object>> work = new Vector<>();
// work.addAll(
// IntStream.range(0, 10000).boxed()
// .map(i -> new Work<Object>() {
// @Override
// Object runTask() {
// for(int z = 0; z < 10000; z++)
// {
// Object o = Math.sin(z * ThreadLocalRandom.current().nextDouble());
// }
// return ThreadLocalRandom.current().nextDouble() * Math.PI;
// //return i;
// }
// }
// ).collect(Collectors.toList())
// );
// GenericTester<Object> genericTester = new GenericTester<>();
// System.out.println(genericTester.testAll(work));
// Vector<Work<Object>> work = new Vector<>();
// work.addAll(
// IntStream.range(0, 10).boxed()
// .map(i -> new Work<Object>() {
// @Override
// Object runTask() {
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return ThreadLocalRandom.current().nextDouble() * Math.PI;
// //return i;
// }
// }
// ).collect(Collectors.toList())
// );
// System.out.println(testAllThree(work));
}
}

+ 45
- 0
src/main/java/net/jrtechs/ThreadPoolExecutor.java View File

@ -0,0 +1,45 @@
package net.jrtechs;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class ThreadPoolExecutor<E> extends ParallelExecutor<E>
{
@Override
public List<E> runTasks(Vector<Work<E>> tasks)
{
ExecutorService executor = Executors.newCachedThreadPool();
List<Callable<E>> callables = new ArrayList<Callable<E>>();
for(Work<E> work: tasks)
{
Callable<E> c = new Callable<E>() {
@Override
public E call() throws Exception {
return work.runTask();
}
};
callables.add(c);
}
List<E> results = new ArrayList<>();
try
{
List<Future<E>> futures = executor.invokeAll(callables);
for(Future<E> future: futures)
{
try {
results.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return results;
}
}

+ 6
- 0
src/main/java/net/jrtechs/Work.java View File

@ -0,0 +1,6 @@
package net.jrtechs;
public abstract class Work<E>
{
abstract E runTask();
}

+ 11
- 0
src/main/java/net/jrtechs/WorkGenerator.java View File

@ -0,0 +1,11 @@
package net.jrtechs;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
public abstract class WorkGenerator<E>
{
abstract Work<E> generateWork(E param);
}

Loading…
Cancel
Save