Java Multithreading with java.util.concurrent

As we know from the previous Multithreading post, working with threads can be challenging, especially when those threads need to coordinate and communicate. Fortunately, Java has some really nice built-in libraries for dealing with many common multithreading situations.

The first place you’ll want to look if you’re thinking of doing multithreading is the java.util.concurrent package. It has been around since Java 1.5, and contains lots of useful classes.

There’s too many of them to cover in one blog post, so today we’re just going to build an interesting example using the Executors class, BlockingQueue<T>, and the concept of Communicating Sequential Processes (CSP).

Distributing the Work with the ExecutorService

Say you have 100,000 CPU-intensive tasks, and your computer has 8 cores, which means it could run 8 tasks at the same time (aka “in parallel”). If you run them all in one thread, you only get 1/8 the speed. When you create threads, Java will run them on the available cores for you.

However, you don’t want to create 100,000 separate threads all at once, because it would use up a lot of system resources, be hard to manage, and run slowly due to the CPU having to bounce back and forth between all the threads constantly.

A better way would be to create 8 threads, one per core, and distribute the tasks to them as they complete. This is known as a “thread pool”, and it gives you better parallelism and efficiency, but would be complicated to do manually. With Executors, you can do it in one line of code with Executors.newFixedThreadPool(8):

ExecutorService execService = Executors.newFixedThreadPool(8); // 8 threads
for (int i = 0; i < 100000, i++) {
  // you can submit a Runnable, or even a Callable and get a Future
  // object representing the result
  execService.submit(() -> doIntensiveTask(a, b, c));
}
execService.shutdown(); // starts the shutdown process
execService.awaitTermination(2, TimeUnit.HOURS); // Blocks w/timeout

We can see that an Executor is an object that runs Runnable tasks, and the ExecutorService lets you control those tasks. It generally handles creating Threads for you as necessary, and lets you separate the decision of how many threads to run and when, from the logic of your program.

Communicating Between Threads with a BlockingQueue

So now we can distribute work to our threads easily, but what if we need some of them to communicate with each other?

One option is for the communicating classes to directly call methods on each other, but this can be very tricky to get right, because you need to make sure you properly use the synchronized keyword and make your class threadsafe.

One way to avoid these complications is to communicate by passing messages between threads. The different threads listen for messages, and are in charge of making changes to themselves in response. Those threads can also post messages to make other threads take action.

For example, instead of one thread calling .incrementCounter(3) directly on another thread, it might do queue.put(new IncrementCounterMessage(3)) and have the other thread increment itself when it gets that message.

If your code will only make modifications to itself in a single thread, you don’t have to worry about making it threadsafe! This pattern is sometimes also known as the Actor model, but you don’t have to know the theory to use it!

The way to do this in Java is by using a BlockingQueue – it represents a FIFO (first in, first out) queue of values, is itself thread-safe, and allows for blocking on both producing and consuming processes. If you give your threads the same queue, they can pass messages between each other in a safe way. Here’s an example of the queue in action (on one thread):

// Create a queue that can have up to 2 elements in it
BlockingQueue queue = new ArrayBlockingQueue<>(2);
queue.put("Hello");
queue.put("World");
// queue.put("blocked!"); // this would block the thread, since the queue is currently full
String hello = queue.take();
String world = queue.take(); 

There are lots of ways we can use queues to communicate between threads, and they have their plusses and minuses. We’re going to show how you can build a pipeline of independent processes that only have to worry about doing their one task. This lets each step in the pipeline be decoupled from the others, and allows the Java libraries to handle all the synchronization.

The name for this type of communication is called “Communicating Sequential Processes”, or CSP for short. It lets you write your processes as if they were sequential (single threaded), but dependent on some other data, and have the underlying system schedule the different processes to run at the right time.

Putting it all Together with an Example

For example, let’s say we have 3 processes that each generate a unique number every so often, and run in parallel. However, we also want to sum all the numbers and print them as they come in.

Diagram of Threads communicating via a queue

In the above diagram, each box is its own thread managed by the ExecutorService. Each NumberGenerator outputs its number into the queue, and the SummingProcess removes them from the queue and adds them up, printing as it goes.

Even though each process is essentially in an infinite loop doing one task, together they communicate to accomplish the parallel summing task – and the queue does all the synchronization for us! Here’s the runnable code that matches the diagram – try the snippet out in ideone:

import java.util.concurrent.*;

class CSPExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // First we create a queue with capacity 1. This means that if there
        // is already an item in the queue waiting to be consumed, any other
        // threads wanting to add an item are blocked until it is consumed.
        // Also, if the consumer tries to get an element out of the queue, but
        // there aren't any items, the consumer will block.
        BlockingQueue queue = new ArrayBlockingQueue<>(1);

        // We create an executor service, which runs our number generators and
        // our summing process. They all get the same queue so they can
        // communicate with each other via messages.
        ExecutorService threadPool = Executors.newFixedThreadPool(4);

        // The NumberGenerators generate different numbers, and wait for different
        // periods of time.
        threadPool.submit(new NumberGenerator(5, 700, queue));
        threadPool.submit(new NumberGenerator(7, 650, queue));
        threadPool.submit(new NumberGenerator(11, 400, queue));

        // The SummingProcess returns the final sum, so we can get a Future
        // that represents the answer at a future time and wait for it to finish.
        Future totalSum = threadPool.submit(new SummingProcess(queue));

        // Waits for the SummingProcess to finish, after it's sum is > 100
        Integer sumResult = totalSum.get();

        System.out.println("Done! Sum was " + sumResult);

        // Interrupts the other threads for shutdown. You can also shutdown
        // threads more gracefully with shutdown() and awaitTermination(),
        // but here we just want them to exit immediately.
        threadPool.shutdownNow();
    }

    private static final class NumberGenerator implements Runnable {

        private final int theNumberToGenerate;
        private final int sleepPeriod;
        private final BlockingQueue queue;

        public NumberGenerator(int theNumberToGenerate, int sleepPeriod, BlockingQueue queue) {
            this.theNumberToGenerate = theNumberToGenerate;
            this.sleepPeriod = sleepPeriod;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                // Produce numbers indefinitely
                while (true) {
                    Thread.sleep(sleepPeriod);

                    // puts the integer into the queue, waiting as necessary for
                    // there to be space.
                    queue.put(theNumberToGenerate);
                }
            } catch (InterruptedException e) {
                // Allow our thread to be interrupted
                Thread.currentThread().interrupt();
            }
        }
    }

    private static final class SummingProcess implements Callable {

        private final BlockingQueue queue;

        public SummingProcess(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public Integer call() {
            try {
                int sum = 0;
                while (sum < 100) {
                    // take() gets the next item from the queue, waiting as necessary
                    // for there to be elements.
                    int nextInteger = queue.take();
                    sum += nextInteger;
                    System.out.println("Got " + nextInteger + ", total is " + sum);
                }
                return sum;
            } catch (InterruptedException e) {
                // Allow our thread to be interrupted
                Thread.currentThread().interrupt();
                return -1; // this will never run, but the compiler needs it
            }
        }
    }
}

If you run it, you can see that the numbers being output all come in unordered and irregularly, but the summing and printing always happens sequentially.

Doing this without the queue wouldn’t be very easy. We could have each thread add it’s number to an AtomicInteger to do the summing, but writing the sum to the console would be hard. This is because each threads’ code might run out-of-order, and the sum would sometimes appear to decrease randomly!

In the above solution, the printing is only happening on one thread, so we know it will always be in the proper order, and we didn’t have to worry about synchronization!

Conclusion

What did we learn here? Mainly that java.util.concurrent is a great place to look for concurrency primitives that make your life easier, and that you can use them to coordinate our multithreaded programs in a controlled way, all without writing a lot of dangerous threading code!

These concepts are pretty advanced, so don’t worry if it doesn’t all click in your first pass through. If you have any questions or suggestions, let me know in the comments below!

About David Ackerman

David Ackerman portraitDavid is a full stack engineer with over 7 years of professional experience in Java and JavaScript. He’s passionate about learning, and teaches everything he knows about development at Fullstack Industries.