CountdownLatch are great for tests. You can block waiting for all threads to call countDown() before testing some results.
You set up a CountdownLatch as follows:
CountDownLatch countDownLatch = new CountDownLatch( 4 );
Then you can pass that CountDownLatch into some Threads. Each thread will at some moment in time call:
countDownLatch.countDown();
You then have another thread which also has CountDownLatch passed to it. This last thread calls the method:
countDownLatch.await();
This waiting thread will wait until 4 threads have called countDown()
This is quite useful when you want to wait for some work to be done by a bunch of threads and then you want to use the data produced by those threads to do something else with.
To get the code for this example:
git clone https://github.com/spotadev/java-examples.git
In both src/main/java and src/test/java navigate to this package:
com.javaspeak.java_examples.concurrency.countdownlatch
You can run the testng unit test using a testng plugin for your IDE or you can run the main method of:
CountdownLatchExampleTest
package com.javaspeak.java_examples.concurrency.countdownlatch;
import java.util.concurrent.CountDownLatch;
/**
* @author John Dickerson - 2 Dec 2022
*/
public interface CountDownLatchExample {
void updateSharedState( CountDownLatch countDownLatch, int amountToIncrement );
int getSharedState();
void doubleSharedStateAfterOtherThreadsComplete( CountDownLatch countDownLatch )
throws InterruptedException;
}
package com.javaspeak.java_examples.concurrency.countdownlatch;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author John Dickerson - 2 Dec 2022
*/
public class CountDownLatchExampleImpl implements CountDownLatchExample {
private Logger logger = LoggerFactory.getLogger( CountDownLatchExampleImpl.class );
private volatile int sharedState = 0;
@Override
public void updateSharedState(
CountDownLatch countDownLatch,
int amountToIncrement ) {
logger.info( "Starting method updateSharedState of thread: " +
Thread.currentThread().getName() );
synchronized ( this ) {
this.sharedState = this.sharedState + amountToIncrement;
countDownLatch.countDown();
}
logger.info( "Completed method updateSharedState of thread: " +
Thread.currentThread().getName() );
logger.info( " Still to count down: " + countDownLatch.getCount() );
}
@Override
public int getSharedState() {
return sharedState;
}
@Override
public void doubleSharedStateAfterOtherThreadsComplete(
CountDownLatch countDownLatch ) throws InterruptedException {
logger.info( "Starting method doubleSharedStateAfterOtherThreadsComplete in thread: "
+ Thread.currentThread().getName() );
// this method blocks until the other threads have counted down the countDownLatch
countDownLatch.await();
synchronized ( this ) {
this.sharedState = this.sharedState * 2;
logger.info( "sharedState = " + sharedState );
}
logger.info( "Completed method doubleSharedStateAfterOtherThreadsComplete in thread: "
+ Thread.currentThread().getName() );
}
}
package com.javaspeak.java_examples.concurrency.countdownlatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* @author John Dickerson - 2 Dec 2022
*/
public class CountDownLatchExampleTest {
private CountDownLatchExample example;
private Logger logger = LoggerFactory.getLogger( CountDownLatchExampleTest.class );
@BeforeClass
public void setup() {
example = new CountDownLatchExampleImpl();
}
@Test
public void doTest() throws InterruptedException, ExecutionException {
// Even though we have 5 threads we will be reusing some
ExecutorService executorService = Executors.newFixedThreadPool( 3 );
Runnable[] runnables = new Runnable[4];
CountDownLatch countDownLatch = new CountDownLatch( 4 );
int[] dataToSet = new int[] { 1, 2, 3, 4 };
// This thread waits on the countDownLatch for the other 4 threads to add the data.
// It then doubles the state.
Runnable awaitingRunnable = new Runnable() {
public void run() {
try {
// internally this thread blocks until the other 4 threads have all called
// countDownLatch.countDown(). It then doubles the value of the state.
example.doubleSharedStateAfterOtherThreadsComplete( countDownLatch );
}
catch ( InterruptedException e ) {
logger.error( "Interrupted" );
}
}
};
executorService.submit( awaitingRunnable );
// these four threads add the values in dataToSet
for ( int i = 0; i < runnables.length; i++ ) {
final int dataToIncrement = dataToSet[i];
runnables[i] = new Runnable() {
public void run() {
// internally calls countDownLatch.countDown()
example.updateSharedState( countDownLatch, dataToIncrement );
}
};
}
for ( Runnable runnable : runnables ) {
executorService.submit( runnable );
}
// this prevents other tasks being run but lets existing scheduled ones finish
// If this is not called before awaitTermination will not work
executorService.shutdown();
boolean terminatedWithoutTimeout =
executorService.awaitTermination( 5, TimeUnit.SECONDS );
Assert.assertTrue( terminatedWithoutTimeout );
// value should be ( 1 + 2 + 3 + 4 ) * 2.
Assert.assertEquals( example.getSharedState(), 20 );
}
public static void main( String[] args ) {
TestListenerAdapter tla = new TestListenerAdapter();
TestNG testng = new TestNG();
testng.setTestClasses( new Class[] { CountDownLatchExampleTest.class } );
testng.addListener( tla );
testng.run();
}
}
Back: Concurrency
Page Author: JD