CyclicBarrier.await() waits until all other threads have reached await() as well
The steps are:
CyclicBarrier cyclicBarrier = new CyclicBarrier( 4, runnable );
Here we are initializing the CyclicBarrier so it can be used by 4 threads.
The cyclicBarrier is passed into 4 threads. Each of the threads will do something and then will call:
cyclicBarrier.await()
Each thread will block on await() until:
The last of the 4 threads to call cyclicBarrier.await() has happened
the optional runnable has been run
To get the code for this example:
git clone https://github.com/spotadev/java-examples.git
In both src/main/java and src/test/java navigate to this package:
com.javaspeak.java_examples.concurrency.cyclicbarrier
You can run the testng unit test using a testng plugin for your IDE or you can run the main method of:
CyclicBarrierExampleTest
package com.javaspeak.java_examples.concurrency.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
/**
* @author John Dickerson - 23 Dec 2022
*/
public class WorkerRunnable implements Runnable {
private Integer[] numbersToAdd;
private CyclicBarrier cyclicBarrier;
private ConcurrentLinkedQueue<Integer> results;
public WorkerRunnable(
Integer[] numbersToAdd,
CyclicBarrier cyclicBarrier,
ConcurrentLinkedQueue<Integer> results ) {
this.numbersToAdd = numbersToAdd;
this.cyclicBarrier = cyclicBarrier;
this.results = results;
}
@Override
public void run() {
try {
Integer sum = 0;
for ( Integer number: numbersToAdd ) {
sum = sum + number;
}
System.out.println(
Thread.currentThread().getName() + " finished work. Adding sum " +
sum + " to results" );
results.add( sum );
// this thread will be blocked by the await until all threads have reached await and
// the AggregatorRunnable has been run
cyclicBarrier.await();
}
catch( BrokenBarrierException | InterruptedException e ) {
}
}
}
package com.javaspeak.java_examples.concurrency.cyclicbarrier;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
* @author John Dickerson - 23 Dec 2022
*/
public class AggregatorRunable implements Runnable {
private ConcurrentLinkedQueue<Integer> results;
private Integer sumOfResults = 0;
private CountDownLatch counDownLatch;
public AggregatorRunable(
ConcurrentLinkedQueue<Integer> results,
CountDownLatch counDownLatch ) {
this.results = results;
this.counDownLatch = counDownLatch;
}
@Override
public void run() {
System.out.println( Thread.currentThread().getName() + " is running the aggregate" );
for ( Integer integer: results ) {
sumOfResults = sumOfResults + integer;
}
counDownLatch.countDown();
}
public Integer getSumOfResults() {
return sumOfResults;
}
}
package com.javaspeak.java_examples.concurrency.cyclicbarrier;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* @author John Dickerson - 2 Dec 2022
*/
public class CyclicBarrierExampleTest {
private AggregatorRunable aggregatorRunable;
private WorkerRunnable[] workerRunnables;
private CountDownLatch countDownLatch;
private Integer[][] work = {
{ 1, 2, 3 },
{ 4, 5, 6 },
{ 7, 8, 9 },
{ 10, 11, 12 }
};
@BeforeClass
public void setup() {
countDownLatch = new CountDownLatch( 1 );
ConcurrentLinkedQueue<Integer> results = new ConcurrentLinkedQueue<>();
aggregatorRunable = new AggregatorRunable( results, countDownLatch );
CyclicBarrier cyclicBarrier = new CyclicBarrier( 4, aggregatorRunable );
workerRunnables = new WorkerRunnable[4];
for ( int i=0; i<4; i++ ) {
workerRunnables[ i ] = new WorkerRunnable(
work[i],
cyclicBarrier,
results );
}
}
@Test
public void doTest() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool( 4 );
for ( int i=0; i<4; i++ ) {
executorService.submit( workerRunnables[i] );
}
executorService.shutdown();
countDownLatch.await();
Integer expected = 0;
for ( Integer[] workItem : work ) {
for( Integer number: workItem ) {
expected = expected + number;
}
}
Integer actual = aggregatorRunable.getSumOfResults();
Assert.assertEquals(actual.intValue(), expected.intValue() );
}
public static void main( String[] args ) {
TestListenerAdapter tla = new TestListenerAdapter();
TestNG testng = new TestNG();
testng.setTestClasses( new Class[] { CyclicBarrierExampleTest.class } );
testng.addListener( tla );
testng.run();
}
}
Both CyclicBarrier and CountDownLatch are initialized with a count:
CountDownLatch counDownLatch = new CountDownLatch( count );
CyclicBarrier cyclicBarrier = new CyclicBarrier( count, optionalRunnable );
Both CyclicBarrier and CountDownLatch have an await() method which the thread blocks on:
counDownLatch.await();
cyclicBarrier.await();
The difference between the two is that:
counDownLatch.countDown() reduces the count
cyclicBarrier.await() reduces the count
Once the count is down to zero:
counDownLatch.await() unblocks
the last thread to call cyclicBarrier.await() triggers the optionalRunnable to be run and then all await() methods unblock
Back: Concurrency
Page Author: JD