Showing posts with label Java. Show all posts
Showing posts with label Java. Show all posts

Thursday, October 28, 2010

Threads in Java can be simpler

Apologies, I've been waiting for my project presentation for class before posting this.

In a previous post I complained about the Java Concurrency example. It showed how unimpressive and needlessly complicated threading can be in Java. What bothers me most isn't the concept of wait() and notify(). It is the decision to explain wait() and notify() before simpler methods of synchronization.

It happens that Java has quite a few helper classes to make synchronization much simpler. The Semaphore is one of those classes. I have constructed a simpler version of the Drop class using two Semaphores.

public class Drop {
    private static final int MAX_MESSAGE = 10;
    private String[] messages = new String[MAX_MESSAGE];
    private int next_available;
    private Semaphore taking = new Semaphore(0, true);
    private Semaphore putting = 
        new Semaphore(MAX_MESSAGE, true);
    
    public void put(String message) {
        try {
            putting.acquire();
        } catch (InterruptedException e) { }
        synchronized (messages) {
            messages[next_available] = message;
            next_available++;
        }
        taking.release();                                        
    }

    public String take() {
        String message;
        try {
            taking.acquire();
        } catch (InterruptedException e) { }
        synchronized (this) {
            next_available--;
            message = messages[next_available];
        }
        putting.release();
        return message;
    }
}

There are three conceptual pieces to that make this new Drop work. The first and second are two Semaphores, putting and taking.  The third piece is the synchronized code block

  1. The putting Semaphore protects against concurrent additions, and overflow.
  2. The taking Semaphore protects against concurrent removals, and underflow.
  3. The synchronized code block it utilizes the intrinsic lock on Drop creating an atomic section. Thereby protecting against concurrent manipulations of the messages array and next_available index into it.

These three pieces are much easier to understand than a broadcast system. Although we understand (or guess) that the underlying Semphore is implemented with the broadcasting wait() and notify() system.

Utilizing the provided utilities the example has become far simpler. There's no need to check the state of the Drop when the thread has resumed. There are no concerns of cascading execution of tasks.

Synchronization in Java could be presented as a simple use of the available utilities.

Monday, September 13, 2010

Threads in Java are not simple

To prepare for an upcoming project in real-time processing I needed an education in Java threads. I wanted to build a simple producer and consumer project with each producer and consumer given their own thread. There would be multiple producers and consumers, and possibly multiple items in the work queue.

This was supposed to be easy, as Java was designed with threading in mind. This is my experience, followed by an explanation of the costly scheduling that the sanctioned examples produces. The length of this post is a testament to the simplicity of implementing synchronized threads in Java.

Working my way through the Java tutorials I found this example. The heart of the example is the Drop class, it is the resource that will be accessed by multiple threads. Through an instance of the Drop class, the Producers will convey messages to the Consumers.

The Drop class with the commentary removed:

Drop.java
    public class Drop {
        private String message;
        private boolean empty = true;

        public synchronized String take() {
            while (empty) {
                try {
                    wait();
                } catch (InterruptedException e) {}
            }
            empty = true;
            notifyAll();
            return message;
        }

        public synchronized void put(String message) {
            while (!empty) {
                try { 
                    wait();
                } catch (InterruptedException e) {}
            }
            empty = false;
            this.message = message;
            notifyAll();
        }
    }

The purpose of the Drop class is to prevent concurrent access to the message member. This is achieved by using the Object's intrinsic lock (also called the monitor lock, or the monitor).


A thread obtains the lock by calling and entering a synchronized method. Any other thread that calls a synchronized method will be blocked until the lock is released. In this example the lock can be released in two ways; the synchronized call returns, or the thread wait()'s.


To understand the process, we need to understand wait() and notifyAll(). When a thread wait()'s it blocks, awaiting an InterruptionException, notifyAll() delivers such an exception.

It's important to understand that each thread will block on two levels, when in the context of a synchronized method without the intrinsic lock, and while wait()'ing.


In the Drop class when modifying or retrieving the message, the procedure is:

  1. Check if the message is empty
  2. If the message is incorrectly empty then wait()
    • When notified go to 1.
  3. Modify or retrieve message
  4. Modify empty
  5. Notify all threads

My Producer and Consumer classes are different from those in the example. They run forever and identify the Producer and Consumer by their thread name. Producers infinitely produce messages, and Consumers read those messages forever spitting them to System.out.

Producer.java
    import java.util.Random;

    public class Producer implements Runnable {
        private Drop drop;

        public Producer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            Random random = new Random();
            String pre = Thread.currentThread().getName()
                       + " widget #: ";
            int i = 1;

            while (true) {
                drop.put(pre + " " + i++);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) { }
            }
        }
    }
Consumer.java
    import java.util.Random;

    public class Consumer implements Runnable {
        private Drop drop;

        public Consumer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            Random random = new Random();
            while (true) {
                message = drop.take();
                System.out.println(
                    Thread.currentThread().getName() + 
                    " MESSAGE RECEIVED: " + message);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {}
            }
        }
    }

Lastly a driver is needed to start the producers and consumers. To ease the scheduling explanation there will be one Producer and four Consumers.

ProducerConsumerExample.java
    public class ProducerConsumerExample {
        public static void main(String[] args) {
            Drop drop = new Drop();
            (new Thread(new Producer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
            (new Thread(new Consumer(drop))).start();
        }
    }

That's a lot of code, and a long introduction. Now we're ready to look at how threads are scheduled. To increase the simplicity of this situation I'm going to ignore the Producer for the moment.

I'm going to name the Consumer threads A, through D. When the program starts, it's non-deterministic which Consumer will start first, second, or third. However, I'm going to order those threads by their start time, and then name them. This means A starts first, B second, C third, and D last.

Assuming there is no message in the Drop instance (it's empty) the following happens. A, B, C, and D all call drop.take(). Since A started first, A is rewarded with the lock. B, C, and D are now all blocked on the intrinsic lock of the Drop object.

From A's perspective this happens
  1. drop.take() determines the message is empty
  2. A wait()s; which releases the lock
Now the lock is released. We'll assume the thread scheduling is fair (it doesn't have to be) and B obtains the lock. Which then...
  1. drop.take() determines the message is empty
  2. B wait()s; which releases the lock
And so on, at the end of this process A, B, C, and D have all relinquished the lock on the Drop instance. But they're all waiting to be notified.

Here is where it gets ugly.


The Producer thread wakes up deciding it's time to put a message in the Drop object. It calls drop.put(). After storing the message and setting empty to false, drop.put() invokes notifyAll().


What happens next?

The naive answer is A Consumer thread consumes the message. This answer did not satisfy me. Does the consumer thread execute in the context of notifyAll()? Doesn't Exception handling trump all other contexts?

To answer these question you have to think about the intrinsic lock, as well as the blocking from wait.

Remember that notifyAll() has been called within the context of drop.put(), which means that an Exception has been generated for all threads waiting on the intrinsic lock. The Producer currently owns the lock (the method is synchronized), no other thread can begin executing until the drop.put() call returns. The call to notifyAll() does not relinquish the lock. (This portion of the answer is  in the  documentation just remember that intrinsic lock is the same as monitor.)

Each of the Consumer threads are sitting in wait(). Earlier I said this relinquished the lock, I lied. Really when a thread calls wait() it's giving up the lock to obtain it later. wait() is waiting on the lock.

I find it's easier to think of the line of code that immediately follows the wait(). The next line of code the thread is going to execute is within the synchronized method drop.take(). The lock is required to execute in the context of the synchronized method. It happens to be that the Exception handling is the next logical section of code. To be able to process the Exception the thread must obtain the lock on the Drop object. This teaches me that Exceptions are not always handled immediately in Java.

Therefor each thread is in competition for the lock. One of them will be given the lock (suppose it's C). The following occurs:
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is a message
  4. Read the message
  5. Clear empty
  6. return from drop.take(); relinquishing the lock.
Now the lock on the Drop object is free. One thread is given the lock, say it's A.
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is no message
  4. wait(); relinquishing the lock
Now the lock on the Drop object is free. One thread is given the lock, say it's D.
  1. Enter drop.take() context; acquiring the lock
  2. Handle the Exception
  3. Find that there is no message
  4. wait(); relinquishing the lock
What this looks like is a cascade of acquire and release of the lock, each thread is given the lock to handle the Exception then almost immediately wait()'s. Which will lead to another cascade.

Due to the property of notifyAll() delivering an exception to all threads waiting on the lock of the object, there is overhead of checking the objects state after the handling of any Exception.

In the situation outlined above adding second Producer makes the situation less clear. This second producer will awaken the first. After adding a message to the Drop object, the second Producer's notifyAll() will be needlessly noticed by the first.

Understanding how a producer and consumer relationship through the Java tutorial example is anything but simple. Handling the lock is managed through two competing systems synchronized methods and notifyAll(). With notifyAll() complicating methods that wait()