BlockOnGetMap is a custom map where a thread blocks on the Get waiting for another thread to put a value in the Map
One of the SpotADev projects called Latency Ball uses the JDK's Java NIO Selector and and a UDP channel. The project uses UDP as a transport protocol for it is more effective for Router Hole Puncturing than TCP. Hole Puncturing is the process of opening up a Router so that packets from the outside can pass through the Router and hit the application running on a certain port.
The rules of a Router are generally that if you send a packet out to an external IP the Router is opened up so you can get a response. The Hole Puncturing works as follows.
Two computers both behind their respective Routers start sending UDP packets to each other at exactly the same time. The first UDP packet to arrive at the destination Router will likely be discarded as the rule: "You can only receive if you have sent" router rule does not hold true yet. However Once both routers have sent a packet to each other they should both be able to receive a packet. Once they have both opened up their Routers we have succesfully hole punctured.
One problem of using the UDP protocol is that it is not guaranteed to deliver. Therefore if you care about guaranteed delivery you have to code what TCP does yourself.
This is where the BlockOnGetMap comes in.
The use case for BlockOnGetMap is as follows:
(i) You generate and ID
(ii) You send a UDP packet to the destination and include the ID in it.
(iii) You call the get method of the BlockOnGetMap and sit waiting
(iv) A NIO selector running in its own thread receives a UDP packet from the other party. Inside the packet is the same ID we previously sent. The selector thread then puts the response into the BlockOnGetMap
(v) The thread blocking on the get method now is unblocked and receives the response.
As UDP packets can get lost we can keep sending a packet if we do not receive a response within a certain time. This is why the get method of the BlockOnGetMap has a timeout.
However the round trip will be long if everytime we want to resend a packet we got to wait for the send and response time. It is for this reason that the sender often sends duplicates in a machine gun fashion. If one packet gets lost maybe the second copy fired in quick succession will reach the destination.
Note that the response packet can also get lost.
However consider that the blocking get has already got its response but the responses are continuing to flow in. As we sent duplicates in quick succession we may also get responses in quick succession. We do not want those responses to stay living in the map for ever.
We therefore have some spring cleaning that occurs to remove expired packets.
We use wait and notify low level synchronization methods.
The Get method goes and adds an entry to a map. It blocks on the key inside a synchronized block using the wait method. It blocks until another thread calls notifyall on the same synchronized key. In the code the synchronized key is caleed the Monitor.
Get thread:
synchronized ( monitor ) {
// some code deleted for readability
while ( true ) {
long now = System.nanoTime();
long diff = now - start;
long diffMilli = diff / 1000000;
long remainingMilli = timeoutMilli - diffMilli;
k.wait( remainingMilli );
if ( remainingMilli < 0 ) {
break;
}
}
}
Note that the wait may spuriously unblock itself at the behest of the hardware which sucks. I classify that as a sort of bug. For this reason we need to put it in a loop and keep trying wait until a condition such as time has been met. Higher level concurrency structures have conditions and do this for you. However we are getting out fingers dirty here.
Put thread:
synchronized ( monitor ) {
valueMap.put( monitor, v );
monitor.notifyAll();
}
The Put method needs to dig out the synchronized key the Get method in another thread was previously blocked on, and call notifyall on it.
The effect of calling notifyall on the same monitor key, will release the get thread blocking on wait.
There is a problem with though. When you synchronize on a monitor in a block both the threads doing the wait and the notify must be synchronized on the same reference monitor.
So for this reason we have a second map that stores just the keys:
Map<Key,Key> keyMap.
Remember in the above use case, the Get thread generates an ID and sends that to another computer. That other computer sends back the same ID in its response. However both the Get and Put threads must sychronize on the same reference monitor.
This is why the put thread can retrieve the same reference key from the key map and synchronize on that.
To retrieve the Key from the Key map we use a Lock however for the waiting and notifying we synchronize on the key itself.
So in summary this is an example of a Custom data structure using Locks, Synchronization, wait and notify for a specific use case which is used for sending UDP packets and hole puncturing.
To get the code for this example:
git clone https://github.com/spotadev/java-examples.git
In both src/main/java and src/test/java navigate to this package:
com.javaspeak.java_examples.concurrency.custom.map.blockonget
You can run the testng unit test using a testng plugin for your IDE or you can run the main method of:
BlockOnGetMapTest
See: testng
package com.javaspeak.java_examples.concurrency.custom.map.blockonget;
/**
* @author John Dickerson - 15 Dec 2022
*/
public interface BlockOnGetMap<K, V> {
V get( K k, Integer timeoutMilli );
void put( K k, V v );
Integer getEntryCount();
}
package com.javaspeak.java_examples.concurrency.custom.map.blockonget;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* @author John Dickerson - 15 Dec 2022
*/
public class BlockOnGetMapTest {
private BlockOnGetMap<Long, String> blockOnGetMap;
@BeforeClass
public void setup() {
blockOnGetMap = new BlockOnGetMapImpl<Long, String>( 3000l );
}
@Test
public void getAndPutTest() throws Exception {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
String value = blockOnGetMap.get( 1l, 4000 );
System.out.println( "got: " + value );
return value;
}
};
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
Future<String> future = executorService.submit( callable );
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep( 2000 );
blockOnGetMap.put( 1l, "hello" );
System.out.println( "Put" );
}
catch ( InterruptedException e ) {
System.out.println( "Was interrupted sleeping" );
}
}
};
executorService.submit( runnable );
executorService.shutdown();
String value = future.get();
Assert.assertNotNull( value );
Assert.assertEquals( value, "hello" );
}
@Test
public void purgeTest() throws Exception {
blockOnGetMap.put( 1l, "hello" );
blockOnGetMap.put( 2l, "world" );
Thread.sleep( 3500 );
blockOnGetMap.put( 3l, "Only this should remain" );
Assert.assertEquals( blockOnGetMap.getEntryCount().intValue(), 1 );
}
public static void main( String[] args ) {
TestListenerAdapter tla = new TestListenerAdapter();
TestNG testng = new TestNG();
testng.setTestClasses( new Class[] { BlockOnGetMapTest.class } );
testng.addListener( tla );
testng.run();
}
}
package com.javaspeak.java_examples.concurrency.custom.map.blockonget;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class is to be used as follows:
*
* We have a request with an asynchronous response. We want to block waiting for the asynchronous
* response.
*
* Here are the steps:
*
* (i) We create an ID.
*
* (ii) We make the request to a remote service and pass it the ID. The response from the remote
* service is asynchronous and it will pass back the ID.
*
* (iii) We call BlockOnGetMap.get( ID ) and block on it.
*
* (iv) Another thread handles the asynchronous response, gets the ID and puts it in the
* BlockOnGetMap.
*
* (v) The method that was blocking on BlockOnGetMap now gets its value.
*
* The call doing the get then removes the key and value from the underlying maps.
*
* Note that a synchronized block is synchronizing on a monitor. It cares about the reference
* of the object. That is why we have a second map so we can dig out the monitor.
*
* @author John Dickerson - 15 Dec 2022
*/
public class BlockOnGetMapImpl<K, V> implements BlockOnGetMap<K, V> {
private Map<K, V> valueMap = new HashMap<>();
private Map<K, K> keyMap = new HashMap<>();
private Map<K, Long> nanoTimeMap = new HashMap<>();
private ReadWriteLock readWriteLockKey = new ReentrantReadWriteLock();
private Long maxMilliSecondsInMap;
public BlockOnGetMapImpl( Long maxMilliSecondsInMap ) {
this.maxMilliSecondsInMap = maxMilliSecondsInMap;
}
private K getMonitorKey( K k ) {
Lock writeLockKey = readWriteLockKey.writeLock();
K monitor = null;
try {
writeLockKey.lock();
monitor = keyMap.get( k );
if ( monitor == null ) {
monitor = k;
keyMap.put( k, monitor );
}
return monitor;
}
finally {
writeLockKey.unlock();
}
}
private void removeKey( K k ) {
Lock writeLockKey = readWriteLockKey.writeLock();
try {
writeLockKey.lock();
keyMap.remove( k );
nanoTimeMap.remove( k );
}
finally {
writeLockKey.unlock();
}
}
private void purgeExpiredKeys() {
Long now = System.nanoTime();
List<K> keysToDelete = new ArrayList<>();
for ( Entry<K, Long> entry : nanoTimeMap.entrySet() ) {
Long diff = ( now - entry.getValue() ) / 1000000;
if ( diff > maxMilliSecondsInMap ) {
keysToDelete.add( entry.getKey() );
}
}
for ( K k : keysToDelete ) {
K valueKey = keyMap.get( k );
valueMap.remove( valueKey );
keyMap.remove( k );
nanoTimeMap.remove( k );
System.out.println( "Purged key: " + k );
}
}
@Override
public V get( K k, Integer timeoutMilli ) {
K monitor = getMonitorKey( k );
V value = null;
synchronized ( monitor ) {
try {
value = valueMap.get( monitor );
if ( value != null ) {
return value;
}
long start = System.nanoTime();
while ( true ) {
long now = System.nanoTime();
long diff = now - start;
long diffMilli = diff / 1000000;
long remainingMilli = timeoutMilli - diffMilli;
k.wait( remainingMilli );
if ( remainingMilli < 0 ) {
break;
}
}
value = valueMap.get( monitor );
return value;
}
catch ( InterruptedException e ) {
System.out.println( "Interrupted" );
return null;
}
finally {
valueMap.remove( monitor );
removeKey( k );
}
}
}
@Override
public void put( K k, V v ) {
K monitor = null;
Lock writeLockKey = readWriteLockKey.writeLock();
try {
writeLockKey.lock();
purgeExpiredKeys();
monitor = keyMap.get( k );
if ( monitor == null ) {
monitor = k;
keyMap.put( k, monitor );
nanoTimeMap.put( k, System.nanoTime() );
}
}
finally {
writeLockKey.unlock();
}
synchronized ( monitor ) {
valueMap.put( monitor, v );
monitor.notifyAll();
}
}
@Override
public Integer getEntryCount() {
return valueMap.size();
}
}
Back: Custom Data Structures | Concurrency
Page Author: JD