Monday, September 13, 2010

Threads in Java are not simple

To prepare for an upcoming project in real-time processing I needed an education in Java threads. I wanted to build a simple producer and consumer project with each producer and consumer given their own thread. There would be multiple producers and consumers, and possibly multiple items in the work queue.

This was supposed to be easy, as Java was designed with threading in mind. This is my experience, followed by an explanation of the costly scheduling that the sanctioned examples produces. The length of this post is a testament to the simplicity of implementing synchronized threads in Java.

Working my way through the Java tutorials I found this example. The heart of the example is the Drop class, it is the resource that will be accessed by multiple threads. Through an instance of the Drop class, the Producers will convey messages to the Consumers.

The Drop class with the commentary removed:

Drop.java
    public class Drop {
        private String message;
        private boolean empty = true;

        public synchronized String take() {
            while (empty) {
                try {
                    wait();
                } catch (InterruptedException e) {}
            }
            empty = true;
            notifyAll();
            return message;
        }

        public synchronized void put(String message) {
            while (!empty) {
                try { 
                    wait();
                } catch (InterruptedException e) {}
            }
            empty = false;
            this.message = message;
            notifyAll();
        }
    }

The purpose of the Drop class is to prevent concurrent access to the message member. This is achieved by using the Object's intrinsic lock (also called the monitor lock, or the monitor).


A thread obtains the lock by calling and entering a synchronized method. Any other thread that calls a synchronized method will be blocked until the lock is released. In this example the lock can be released in two ways; the synchronized call returns, or the thread wait()'s.


To understand the process, we need to understand wait() and notifyAll(). When a thread wait()'s it blocks, awaiting an InterruptionException, notifyAll() delivers such an exception.

It's important to understand that each thread will block on two levels, when in the context of a synchronized method without the intrinsic lock, and while wait()'ing.


In the Drop class when modifying or retrieving the message, the procedure is:

  1. Check if the message is empty
  2. If the message is incorrectly empty then wait()
    • When notified go to 1.
  3. Modify or retrieve message
  4. Modify empty
  5. Notify all threads

My Producer and Consumer classes are different from those in the example. They run forever and identify the Producer and Consumer by their thread name. Producers infinitely produce messages, and Consumers read those messages forever spitting them to System.out.

Producer.java
    import java.util.Random;

    public class Producer implements Runnable {
        private Drop drop;

        public Producer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            Random random = new Random();
            String pre = Thread.currentThread().getName()
                       + " widget #: ";
            int i = 1;

            while (true) {
                drop.put(pre + " " + i++);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) { }
            }
        }
    }
Consumer.java
    import java.util.Random;

    public class Consumer implements Runnable {
        private Drop drop;

        public Consumer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            Random random = new Random();
            while (true) {
                message = drop.take();
                System.out.println(
                    Thread.currentThread().getName() + 
                    " MESSAGE RECEIVED: " + message);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {}
            }
        }
    }

Lastly a driver is needed to start the producers and consumers. To ease the scheduling explanation there will be one Producer and four Consumers.

ProducerConsumerExample.java
    public class ProducerConsumerExample {
        public static void main(String[] args) {
            Drop drop = new Drop();
            (new Thread(new Producer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
        }
    }

That's a lot of code, and a long introduction. Now we're ready to look at how threads are scheduled. To increase the simplicity of this situation I'm going to ignore the Producer for the moment.

I'm going to name the Consumer threads A, through D. When the program starts, it's non-deterministic which Consumer will start first, second, or third. However, I'm going to order those threads by their start time, and then name them. This means A starts first, B second, C third, and D last.

Assuming there is no message in the Drop instance (it's empty) the following happens. A, B, C, and D all call drop.take(). Since A started first, A is rewarded with the lock. B, C, and D are now all blocked on the intrinsic lock of the Drop object.

From A's perspective this happens
  1. drop.take() determines the message is empty
  2. A wait()s; which releases the lock
Now the lock is released. We'll assume the thread scheduling is fair (it doesn't have to be) and B obtains the lock. Which then...
  1. drop.take() determines the message is empty
  2. B wait()s; which releases the lock
And so on, at the end of this process A, B, C, and D have all relinquished the lock on the Drop instance. But they're all waiting to be notified.

Here is where it gets ugly.


The Producer thread wakes up deciding it's time to put a message in the Drop object. It calls drop.put(). After storing the message and setting empty to false, drop.put() invokes notifyAll().


What happens next?

The naive answer is A Consumer thread consumes the message. This answer did not satisfy me. Does the consumer thread execute in the context of notifyAll()? Doesn't Exception handling trump all other contexts?

To answer these question you have to think about the intrinsic lock, as well as the blocking from wait.

Remember that notifyAll() has been called within the context of drop.put(), which means that an Exception has been generated for all threads waiting on the intrinsic lock. The Producer currently owns the lock (the method is synchronized), no other thread can begin executing until the drop.put() call returns. The call to notifyAll() does not relinquish the lock. (This portion of the answer is  in the  documentation just remember that intrinsic lock is the same as monitor.)

Each of the Consumer threads are sitting in wait(). Earlier I said this relinquished the lock, I lied. Really when a thread calls wait() it's giving up the lock to obtain it later. wait() is waiting on the lock.

I find it's easier to think of the line of code that immediately follows the wait(). The next line of code the thread is going to execute is within the synchronized method drop.take(). The lock is required to execute in the context of the synchronized method. It happens to be that the Exception handling is the next logical section of code. To be able to process the Exception the thread must obtain the lock on the Drop object. This teaches me that Exceptions are not always handled immediately in Java.

Therefor each thread is in competition for the lock. One of them will be given the lock (suppose it's C). The following occurs:
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is a message
  4. Read the message
  5. Clear empty
  6. return from drop.take(); relinquishing the lock.
Now the lock on the Drop object is free. One thread is given the lock, say it's A.
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is no message
  4. wait(); relinquishing the lock
Now the lock on the Drop object is free. One thread is given the lock, say it's D.
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is no message
  4. wait(); relinquishing the lock
What this looks like is a cascade of acquire and release of the lock, each thread is given the lock to handle the Exception then almost immediately wait()'s. Which will lead to another cascade.

Due to the property of notifyAll() delivering an exception to all threads waiting on the lock of the object, there is overhead of checking the objects state after the handling of any Exception.

In the situation outlined above adding second Producer makes the situation less clear. This second producer will awaken the first. After adding a message to the Drop object, the second Producer's notifyAll() will be needlessly noticed by the first.

Understanding how a producer and consumer relationship through the Java tutorial example is anything but simple. Handling the lock is managed through two competing systems synchronized methods and notifyAll(). With notifyAll() complicating methods that wait()

No comments:

Post a Comment