Today, we are going to look at one of the aspects where using streams is a real win – when we need to thread work. As well as parallel streams, we will also look at Spliterators which acts as the machinery which pushes elements into the pipeline.
Streams use a technique known as internal iteration. It’s internal because the Iterator (or in our case Spliterator) which supplies work through our stream is hidden from us. To use a stream, all we need do [once we have a source] is add the stages of the pipeline and supply the functions that these stages require. We don’t need to know how the data is being passed along the pipeline, just that it is. The benefit is that the workings are hidden from us and we can focus more on the work that must be done rather than how it can be done.
The opposite, external iteration is where we are given a loop variable or iterator and we look up the value, and pass it through the code ourselves. This obviously gives us a benefit in that have full control and low overhead. The downside is we have to do all the work looking up the values and passing them through the loop body. This will also mean more test code, and testing loop bodies properly can be tricky. With normal for
-loops, we also have to be careful of one-off errors.
The question we need to ask ourselves when considering the iteration method: Do we really need absolute control for the task? Streams do some things really well but come with a small performance penalty. Perhaps a non-stream (or even non-Java) solution is more appropriate for high performance work. On the other hand, sorting and filtering files to display in say a ‘recently accessed’ menu item doesn’t require high performance. In that case, we’d probably settle for an easy and quick way to do it rather than the best performing one. Even if we go with a performant solution, some benchmarking will be necessary as surprises often await. Thus we’re trading convenience off against performance, development time and risk of bugs.
Streams are easy to parallelise as we’ll see. We just change the type of the stream to a parallel stream using the parallel()
operator. To do this with internal iteration is hard because it’s set up that we get one item per iteration. The best we can do in that environment is pass work off to threads. To do things efficiently, we’d probably have to ditch looping through all the values in the outer loop and look at dividing the work up another way. We’ll see a way of doing this.
With that in mind, we’ll look at a prime number generator. First, this is not the most efficient prime number generator. For a demonstration, it was useful to have an application that was well known, easy to understand, easy to perform with streams and would take a fair bit of computation time to complete.
Let’s look at the internal iteration version first:
public class ForLoopPrimes
{
public static Set<Integer> findPrimes(int maxPrimeTry)
{
Set<Integer> s = new HashSet<>();
for (int i = 2; i <= maxPrimeTry; i++)
{
int maxJ = (int) Math.sqrt(i);
for (int j = 2; j <= maxJ; j++)
{
if (i / j * j == i)
{
s.add(i);
break;
}
}
}
return s;
}
public static void main(String args[])
{
int maxPrimeTry = 4999999;
long startTime = System.currentTimeMillis();
Set<Integer> s = findPrimes(maxPrimeTry);
long timeTaken = System.currentTimeMillis() - startTime;
IntStream.rangeClosed(2, maxPrimeTry)
.filter(i -> !s.contains(i))
.forEach(System.out::println);
System.out.println("Time taken: " + timeTaken);
}
}
Note: Since we only need to find one divisor, and multiplication is commutative, we only need to exhaust all potential pairs of factors and test one of them [the smaller]. The smaller can’t be any bigger than the square root of the candidate prime and must be at least 2.
This is an example of a brute force algorithm. We’re trying every combination rather than using any stealth or optimisation. We’d also in this case expect the internal iteration version to run fast since there is not a lot of work per iteration.
So why do we have to demonstrate this?
Suppose we want to take advantage of hardware in modern processors and thread this up. How might we do it? Up to Java 7 and certainly before Java 5, this would have been a real pain. We’ve got to divide up the workload, maintain a pool of threads and signal them that there is work available and then collect the work back from them when done. We probably also want to shut the worker threads down at the end if we have any more work to do. While it’s not rocket science, it can be hard to get right quickly and subtle bugs can be hard to spot.
Java 7 makes this a lot easier with the ForkJoin framework. It’s still tricky and easy to get wrong. We’ll use a RecursiveAction to break up the outer loop into pieces of work using a divide-and-conqueror strategy. Note that parallel streams do this as well.
public class ForkJoinPrimes
{
private static int workSize;
private static Queue<Results> resultsQueue;
private static class Results
{
public final int minPrimeTry;
public final int maxPrimeTry;
public final Set resultSet;
public Results(int minPrimeTry, int maxPrimeTry, Set resultSet)
{
this.minPrimeTry = minPrimeTry;
this.maxPrimeTry = maxPrimeTry;
this.resultSet = resultSet;
}
}
private static class FindPrimes extends RecursiveAction
{
private final int start;
private final int end;
public FindPrimes(int start, int end)
{
this.start = start;
this.end = end;
}
private Set<Integer> findPrimes(int minPrimeTry,
int maxPrimeTry)
{
Set<Integer> s = new HashSet<>();
for (int i = minPrimeTry; i <= maxPrimeTry; i++)
{
int maxJ = (int) Math.sqrt(i);
for (int j = 2; j <= maxJ; j++)
{
if (i / j * j == i)
{
s.add(i);
break;
}
}
}
return s;
}
protected void compute()
{
if (end - start < workSize)
{
resultsQueue.offer(new Results(start, end,
findPrimes(start, end)));
}
else
{
int mid = (start + end) / 2;
invokeAll(new FindPrimes(start, mid),
new FindPrimes(mid + 1, end));
}
}
}
public static void main(String args[])
{
int maxPrimeTry = 4999999;
int maxWorkDivisor = 8;
workSize = (maxPrimeTry + 1) / maxWorkDivisor;
ForkJoinPool pool = new ForkJoinPool();
resultsQueue = new ConcurrentLinkedQueue<>();
long startTime = System.currentTimeMillis();
pool.invoke(new FindPrimes(2, maxPrimeTry));
long timeTaken = System.currentTimeMillis() - startTime;
System.out.println("Number of tasks executed: " +
resultsQueue.size());
while (resultsQueue.size() > 0)
{
Results results = resultsQueue.poll();
Set<Integer> s = results.resultSet;
IntStream.rangeClosed(results.minPrimeTry,
results.maxPrimeTry)
.filter(i -> !s.contains(i))
.forEach(System.out::println);
System.out.println("Time taken: " + timeTaken);
}
}
}
This is quite recognizable since we have reused the sequential code to carry out the work in a subtask. We create two RecursiveActons to break the workload into two pieces. We keep breaking down until the workload is below a certain size when we carry out the action. We finally collect our results on a concurrent queue. Note there is a fair bit of code.
Let’s look at a sequential Java 8 streams solution:
public class SequentialStreamPrimes
{
public static Set<Integer> findPrimes(int maxPrimeTry)
{
return IntStream.rangeClosed(2, maxPrimeTry)
.map(i -> IntStream.rangeClosed(2,
(int) (Math.sqrt(i)))
.filter(j -> i / j * j == i).map(j -> i)
.findAny().orElse(0))
.mapToObj(x -> Integer.valueOf(x))
.collect(Collectors.toSet());
}
public static void main(String args[])
{
int maxPrimeTry = 4999999;
long startTime = System.currentTimeMillis();
Set<Integer> s = findPrimes(maxPrimeTry);
long timeTaken = System.currentTimeMillis() - startTime;
IntStream.rangeClosed(2, maxPrimeTry)
.filter(i -> !s.contains(i))
.forEach(System.out::println);
System.out.println("Time taken: " + timeTaken);
}
}
We can see the streams solution matches up with the external iteration version quite well except for a few tricks needed:
- In the inner loop, if we find a non-prime, we need to record the outer value (the candidate,
i
) rather than the inner (the divisor, j
) and we so use a mapping to do this. - Since we only need one factor, we use
findAny()
. This acts like the break
statement. findAny
returns an Optional so we need to unwrap it to get our value. If we have no value (i.e., we found a prime), we still need to store something. We can use the orElse
to convert to a 0
which can be later ignored. This gives us a small bit of extra work though.
So let’s make it threaded. We only need to change the findPrimes
method slightly:
public static Set<Integer> findPrimes(int maxPrimeTry)
{
return IntStream.rangeClosed(2, maxPrimeTry)
.parallel()
.map(i -> IntStream.rangeClosed(2,
(int) (Math.sqrt(i)))
.filter(j -> i / j * j == i).map(j -> i)
.findAny().orElse(0))
.mapToObj(x -> Integer.valueOf(x))
.collect(Collectors.toSet());
}
This time, we don’t have to mess around with the algorithm. Simply by adding an intermediate stage parallel()
to the stream
, we make it divide up the work. Parallel()
, like filter and map, is an intermediate operation. Intermediate operations can also change the behaviour of a stream as well as affect the passing values. Other intermediate stages we’ve not seen yet are:
sequential()
– make the stream sequential distinct()
– only distinct values pass sorted()
– a sorted stream is returned, optionally we can pass a Comparator unordered()
– return an unordered stream
If we fire up jconsole while we’re running and look at the Threads tab, we can compare the sequential and parallel version. In the parallel version, we can see several ForkJoin threads doing the work.
I did some timings and got the following results [note this is not completely accurate since other tasks might have been running in the background on my machine].
- External, sequential (for-loop): 8.5 seconds
- External, parallel (ForkJoin): 4 seconds
- Internal, sequential (sequential stream): 14 seconds
- Internal, parallel (parallel stream): 8 seconds
This is probably as expected. The amount of work per iteration in the inner loop is low, so any stream actions will have relatively high overhead. The parallel stream comes in slightly faster than the for
-loop, but the ForkJoin version outperforms it by a factor of 2. Note how simple the streams version was [once we get the hang of streams of course] compared to the amount of code in the ForkJoin version.
Let’s have a look at the work-horse of this work distribution, the Spliterator. A Spliterator is an interface like an Iterator, but instead of just providing the next value, it can also divide work up into smaller pieces which are executed by ForkJoinTasks
.
When we create a Spliterator, we provide details of the size of the workload and characteristics that the values have. Some types of Spliterators such as RangeIntSpliterator
[which IntRange
supplies] use the characteristics()
method to return characteristics, rather than having them supplied via a constructor like AbstractSpliterator
does.
We obviously need the size of the workload so we can divide the work up and know when to stop dividing. The characteristics we can supply are defined in the Spliterator
interface as follows:
SIZED
– We can supply a specific number of values that will be sent prior to processing (versus an InfiniteSupplyingSpliterator
). SUBSIZED
– Implies that any Spliterator
s that trySplit()
creates will be SIZED
and SUBSIZED
. Not all SIZED
Spliterator
s will split into SUBSIZED spliterator
s. The API gives an example of a binary tree where we might know how many elements are in the tree, but not in the sub-trees. ORDERED
– We supply the values in sequence, for example from a list. SORTED
– The order follows a sort order (rather than sequence); ORDERED
must also be set. DISTINCT
– Each value is different from every other, for example if we supply from a set. NONNULL
– Values coming from the source will not be null
. IMMUTABLE
– It’s impossible to change the source (such as add or remove values) – if this is not set and neither is CONCURRENT
, we’re advised to check the documentation for what happens on modification (such as a ConcurrentModificationException
). CONCURRENT
– The source may be concurrently modified safely and we’re advised to check the documentation on the policy.
These characteristics are used by the splitting machinery, for example in the ForEachOps
class (which is used to carry out tasks in a pipeline terminated with a forEach
). Normally, we can just use a pre-built Spliterator
[and often don't even need to worry about that because it's supplied by the stream()
method]. Remember the streams framework allows us to get work done without having to know all the details of how it's being done. It’s only in the rare cases of a special problem or needing maximum performance do we have to worry.
Splitting is done by the trySplit()
operation. This returns a new Spliterator
. For the requirements of this function, the API documentation should be referred to.
When we consume the contents of [part of] the stream in bulk using the Spliterator
, the forEachRemaining(action)
operation is called. This takes source data and calls the next action via the action’s accept call. For example, if the next operation is filter, the accept
call on filter is called. This calls the test method of the contained predicate, and if that is true
, the accept
method of the next stage is called. At some point, a terminal stage will be called [the accept
method calls no other stage] and the final value will be consumed, reduced or collected. When we call a stream()
method, this pipeline is created and calling intermediate stages chains them to the end of the pipeline. Calling the final consuming stage makes the final link and sets everything off.
Alternatively, when we need to generate each element from a non-bulk source, the tryAdvance()
function is used. This is passed an action which accept is called on as before. However, we return true
if we want to continue and false
if we don’t. InfiniteSupplyingSpliterator
for example always returns true
, but we can use an AbstractSpliterator
if we want to control this. Remember the AbstractIntSpliterator
from our SixGame
in the finite generators article? One of our tryAdvance
functions was this:
@Override
public boolean tryAdvance(Consumer action)
{
if (action == null)
throw new NullPointerException();
if (done)
return false;
action.accept(rollDie());
return true;
}
In this case, if we roll the die we always continue. This would allow the done logic to be set from elsewhere if we didn’t want to roll a die again. It might have been slightly better to have returned !done
instead of true
to terminate generation immediately as soon as the six was thrown. However in this case, going through another cycle was hardly a chore.
That’s it for the streams overview. In the next article, we’ll look a bit more at lambda expressions.