wait(), notify() and notifyAll() can be used by your custom concurrent Data Structures
Wait and notify are used as low level method calls to make more robust components. The wait method has issues - more about this later and must be coded properly.
This example is not coded properly but illustrates the principle behind wait and notify. Basically One thread sits waiting on a monitor until another thread notifies it on the same monitor. More about why the wait() is not coded properly later.
@Test
public void waitNotifyTest() throws Exception {
String monitor = "monitor";
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
Callable<String> callableWait = new Callable<>() {
@Override
public String call() throws Exception {
synchronized ( monitor ) {
monitor.wait();
}
return "notified";
}
};
Future<String> future = executorService.submit( callableWait );
Thread.sleep( 1000 );
Runnable runnableNotify = new Runnable() {
@Override
public void run() {
synchronized ( monitor ) {
monitor.notifyAll();
System.out.println( "Notified others" );
}
}
};
executorService.submit( runnableNotify );
executorService.shutdown();
String message = future.get();
Assert.assertEquals( message, "notified" );
}
Note that in order for wait and notify to work we need:
both the waiting code and notifying code to be inside a synchronized method which implicitly locks on the instance of the class
or as in this example we need a synchronized block which is synchronized on the same monitor instance.
In the above code example we used notifyAll(). That methods unblocks ALL threads that are waiting on the monitor. notify() just unblocks any one thread which is waiting on wait(). Which thread it unblocks is indeterministic. i.e. it does not block the thread which has been waiting the most first.
Note the way the following is coded is not that great:
monitor.wait();
The reason is Java calls a JNI method to speak to the underlying operating system C libraries and those libraries call hardware. The hardware may spuriously break the wait before the other thread called notify.
For this reason the waiting code should be more like this:
synchronized( monitor ) {
while ( !conditionMet ) {
monitor.wait();
conditionMet = getConditionMet();
if ( conditionMet ) {
break;
}
}
}
The other thread which is doing the notify will do something like:
synchronized( monitor ) {
setConditionMet( true );
monitor.notifyAll();
}
It is for this reason that books say do not use wait and notify yourself but instead use Locks or Locks with Conditions. Locks and Locks with Conditions use lower level stuff kike wait and notify themselves.
However if you are coding your own Data Structure you may not want to drag other heavy weight concurrency components into your Data Structure. It all depends on your level of knowledge.
Another feature you are not supposed to use which is MUCH more EXTREME than wait, notify, notifyAll is the Unsafe class which is used by Data Structures like the JDK's ConcurrentHashMap. You cannot even use Unsafe without using reflection.
git clone https://github.com/spotadev/javas-examples.git
package com.javaspeak.java_examples.concurrency.waitnotify;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* @author John Dickerson - 15 Dec 2022
*/
public class WaitNotifyTest {
@Test
public void waitNotifyTest() throws Exception {
String monitor = "monitor";
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
Callable<String> callablWait = new Callable<>() {
@Override
public String call() throws Exception {
synchronized ( monitor ) {
monitor.wait();
}
return "notified";
}
};
Future<String> future = executorService.submit( callablWait );
Thread.sleep( 1000 );
Runnable runnableNotify = new Runnable() {
@Override
public void run() {
synchronized ( monitor ) {
monitor.notifyAll();
System.out.println( "Notified others" );
}
}
};
executorService.submit( runnableNotify );
executorService.shutdown();
String message = future.get();
Assert.assertEquals( message, "notified" );
}
@Test( )
public void waitTimeoutTest() throws Exception {
String monitor = "monitor";
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
Callable<String> callablWait = new Callable<>() {
@Override
public String call() throws Exception {
synchronized ( monitor ) {
long before = System.nanoTime();
monitor.wait( 1000 );
long after = System.nanoTime();
long diff = after - before;
long diffMilli = diff / 1000000;
System.out.println( diffMilli );
if ( diffMilli >= 1000 ) {
return "failed";
}
}
return "notified";
}
};
Future<String> future = executorService.submit( callablWait );
executorService.shutdown();
Thread.sleep( 2000 );
String message = future.get();
Assert.assertEquals( message, "failed" );
}
}
Links
Back: Concurrency
Page Author: JD