This example of a ConcurrentMap implementation uses Buckets and Linked Lists moving off each bucket
All Data Structures come down to basic building blocks:
Arrays
Linked Nodes
Retrieving data from an Array is very fast if you know which index to go to.
If you have a Key of the data you want to retrieve there needs to be a way to map the key to an index of an Array. It turns out this is child's play and can be done using:
hashcode of the key
modulus calculation
In Java it is wise to create a hashcode method for your key class. If you do not provide a hashcode method your key class will inherit the hashcode method of object.
So imagine that calling hashcode() on your key class results in the number 1.
Now imagine that there is an array of 4 slots.
We perform a modulus calculation:
remainder = 1 % 4 = 1
This means that we should attempt to store the key with remainder 1 at index 1 of the array.
Note that if the hashcode was instead 5 the calculation sould be:
remainder = 5 % 4 = 1
The % is the modulus symbol. The modulus is basically the remainder. If you divide a number by another number and your answer has to be whole numbers you will be left with a remainder. For example if your divide 13 by 4, you will have 3 whole numbers and a remainder of 1. 1 becomes the index of the array.
This is where we need to create a chain of buckets leading off the index.
This introduces the concept of buckets. Back to the example of an array of 4 slots. We put a Bucket in each Slot.
[0][1][2][3]
|
[x]
Here we have a bucket in index 0 but as another key also had the same 0 index, we are creating a linked node leading to another bucket denoted [x].
So in order to navigate through a linked list of buckets, each bucket needs to have an optionally populated child bucket.
We already talked about how buckets can have an optionally populated child bucket and this allows for chaining.
Let us now think about the structure of the Bucket itself.
We need to remember the Key and Value of a bucket. We could simply add the Key and Value as fields of the bucket, however I will explain why we do NOT do that and why we instead create an optionally populated KeyValuePair object to place in the bucket. The KeyValuePair holds the Key and Value fields.
To understand why we create a KeyValuePair let me introduce the concept of CAS.
There is a feature in Java called CAS (Compare and Swap) - this feature is provided by the hardware. If you drill into the JDK and look at CAS functionality you will see a JNI native call to the native compare and swap function.
So what is CAS?
CAS is a concurrency mechanism for replacing a value held in memory.
Its basic principle is that before trying to update the memory you get the existing value held in memory. You then pass BOTH the old value and new value you want to replace it with into the CAS function:
boolean wasSwapped = false;
while ( ! wasSwapped ) {
oldValue = getOldValue()
wasSwapped = casReplace( oldValue, newValue);
}
Internally in the hardware supported native function it will only replace the old value with then new value if the old value is the same as the old value you passed in.
The old value will not be the same if some other thread replaced the value while you were trying to replace it. So instead of using a Lock or synchronized method or synchronized block the idea of CAS is that you keep trying to replace the value in a loop until you suceed. Hence the loop in the above pseudo code.
This means that with MEDIUM contention between the threads your CAS operation in the while loop will likely succeed faster than if you were using synchronized blocks, methods or locks.
However with HIGH contention between the threads your code could continue looping for a long time and would be SLOWER than if you had used synchronized blocks, methods or locks.
Why are synchronized blocks slower that CAS operations with Medium Contention?
The reason that synchonized blocks, methods and locks are slower than CAS operations with MEDIUM contention is because of how the hardware works with threads when you are sharing information across threads. While having multiple CPU Cores allows for different threads to run on different Cores, what happens when you have more threads than Cores?
Well the answer is the hardware has to do "Context Switching" - it needs to pause the current thread and unload the context of one thread so that it can load the context of a second thread. - this is VERY expensive. The more sharing of state you do between threads (in sychronized blocks) the more context switching is required and the more CPU cycles are used to do the switching.
Mechanical Sympathy
So we can reduce context switching by having the same number of threads as we have Cores. We can also try and code in such a way that we REDUCE context switching. To do that our code has to have Mechanical Sympathy. We can use CAS for mechanical sympathy.
Another way to code for Mechanical Sympathy is to create virtual threads. To create Virtual threads we can code event driven operations.
Event Driven Virtual threads
If each of the operations is very small and fast we can simulate threads by having queues. A single thread can consume events off queues and add them to other queues.
In fact we use this principle of event driven concurrency in the JAVA NIO JDK package. In Java NIO we have the concept of Selectors which respond to events. This can be used for processing events from multiple UDP, TCP and Pipe channels, all from a SINGLE thread!
We were talking about why we needed a KeyValuePair object in the Bucket but then had to pause the discussion to explain what CAS was.
However now that we know what CAS is let us look at CAS implementations in the JDK.
We can use CAS counters like AtomicInteger for updating single values like Integers:
However here we want to update two values atommically: the key and the value.
For this purpose there is the CAS AtomicReference JDK class:
AtomicReference<KeyValuePair<K, V>> keyValuePairReference =
new AtomicReference( startingKeyValuePair );
boolean wasSwapped;
while ( !wasSwapped) {
oldKeyValuePair = getOldKeyValuePair();
wasSwapped = keyValuePairReference.compareAndSet(
oldKeyValuePair, newKeyValuePair );
}
This AtomicReference class allows us to swap an instance in one go. That instance can hold multiple values,
In our ConcurrencyMap design we have explained that we can have chained buckets leading off an index of an array. We have also explained that each Bucket contains a KeyValuePair which can be updated atomically.
Now we move to the question of deleting. In our implementation we only set the KeyValuePair to null when deleting an entry in a Bucket. We do not delete the Bucket. The reason is we are using CAS and want to keep everything atomic and fast without using synchronized methods, blocks or locks.
Note that we should never delete Buckets which are sitting in the array slot. However for those Buckets which are leading off in chains from the slot Buckets are not needed if their KeyValuePair has been set to null.
Spring Cleaning is the process of triggering the removing of un-used chained Buckets when a certain count of setting KeyValuePairs to null has occurred. A bucket is considered unused if it is in a chain and its KeyValuePair has been set to null.
This Spring Cleaning is rather like a Stop the World Garbage Collection Pause.
The current implementation does not try and resize the array. This could be added to the implementation. However this custom ConcurentMap does not provide functionality which is superior to the JDK's ConcurrentHashMap and therefore there is little point to go to town with the Spring Cleaning.
However the idea of Spring Cleaning is a valuable concept when designing a data structure. Data Structures can be optimised for Read, Write or Delete. The optimisation will depend on the usage pattern. Other data structures are designed like a compromise - they are not as fast as they could be for Read, Write and Delete but they are good middle ground if you require consistency in performance. Trading platforms require consistency.
However if a trading platform's JVM is restarted frequently as matter of policy then there would be little need to trigger Spring Cleaning as long as performance is still accepting. If there is zero Spring Cleaning you could be increasing the memory footprint which could result in out of Memory errors. Probably the best strategy for keeping Performance maximised is to do small Spring Cleaning often.
The principle of Spring Cleaning is not too dissimilar to Garbage Collection.
To get the code for this example:
git clone https://github.com/spotadev/java-examples.git
In both src/main/java and src/test/java navigate to this package:
com.javaspeak.java_examples.concurrency.custom.map.concurrent
You can run the testng unit test using a testng plugin for your IDE or you can run the main method of:
ConcurrentMapTest
See: testng
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
/**
* @author John Dickerson - 16 Dec 2022
*/
public interface Bucket<K, V> {
/**
* Gets the KeyValuePair. A KeyValuePair contains a Key and Value
*
* @return KeyValuePair
*/
public KeyValuePair<K, V> getKeyValuePair();
/**
* Sets the KeyValuePair. A KeyValuePair contains a Key and Value
*
* @param oldKeyValuePair
* @param newKeyValuePair
*
* @return true if managed to set, else false
*/
public boolean setKeyValuePairAtomically(
KeyValuePair<K, V> oldKeyValuePair, KeyValuePair<K, V> newKeyValuePair );
/**
* A Bucket can have child Buckets if more than key is assigned the same array index.
*
* @return child Bucket of the parent Bucket
*/
public Bucket<K, V> getChildBucket();
/**
* Sets a child Bucket to the Bucket
*
* @param oldChildBucket
* @param newChildBucket
*
* @return true if managed to set, else false
*/
public boolean setChildBucketAtomically(
Bucket<K, V> oldChildBucket, Bucket<K, V> newChildBucket );
/**
* Sets a child Bucket to the Bucket
*
* @param childBucket
*/
public void setChildBucket( Bucket<K, V> childBucket );
/**
* Gets the array index which this Bucket is at. Note the several buckets may have the same
* index if they are chained.
*
* @return the index of this Bucket
*/
public int getIndex();
}
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
import java.util.concurrent.atomic.AtomicReference;
/**
* In the ConcurrentMap the buckets array is initialized with instances of BucketImpl.
*
* When an entry is being added to the ConcurrentMap, the hashCode() of the key is called and the
* resultant hash has a modulus performed on it against the number of buckets in the array:
*
* int hashCode = key.hashCode();
* int arrayIndex = hashCode % bucketSize;
*
* This maps a hashcode to an array index.
*
* The problem with this is that more than one hashcode may end up being assigned to the same
* arrayIndex. For example "BB" and "Aa" have the same hashcode which would result in them being
* stored in a chain of Buckets at the same array index.
*
* hashCode = "BB".hashCode() = 2112
* hashCode = "Aa".hashCode() = 2112
*
* If the number of buckets are 33 both "BB" and "Aa" would need to be stored at index 0:
*
* int arrayIndex = 2112 % 33 = 0
*
* If the size of the hashcodes is much bigger than the number of slots in the array we will also
* expect the modulus to give the same array index for different hashcodes. If many more keys are
* added to the map then there are array slots then we will expect the number of collisions to be
* even greater as many different keys will have the same index.
*
* To get around this, further buckets can be chained to the bucket already in the arrayIndex.
* Each Bucket has a reference to a child bucket. If a new key translates to an index in the bucket
* array that already has a KeyValuePair in it then a new bucket is created and added as a child
* to the last bucket in the chain.
*
* Note that these chains make looking up a key slower so it is good to initialize the ConcurrentMap
* with a size which is bigger than the expected number of items you are putting in it.
*
* @author John Dickerson - 16 Dec 2022
*
* @param <K> Key of ConcurrentMap
* @param <V> Value of ConcurrentMap
*/
public class BucketImpl<K, V> implements Bucket<K, V> {
private int index;
private AtomicReference<KeyValuePair<K, V>> keyValuePairReference;
private AtomicReference<Bucket<K, V>> childBucketReference = new AtomicReference<>();
/**
* Constructor
*/
public BucketImpl() {
}
/**
* Constructor
*
* @param index Index of buckets array that this Bucket resides in
*/
public BucketImpl( int index ) {
this.index = index;
keyValuePairReference = new AtomicReference<>();
}
/**
* @param key
* Key of ConcurrentMap
*
* @param value
* Value of ConcurrentMap
*
* @param index
* Index of buckets array that this Bucket resides in
*/
public BucketImpl( K key, V value, int index ) {
this.index = index;
KeyValuePair<K, V> keyValuePair = new KeyValuePair<>( key, value );
keyValuePairReference = new AtomicReference<>( keyValuePair );
}
@Override
public KeyValuePair<K, V> getKeyValuePair() {
return keyValuePairReference.get();
}
@Override
public boolean setKeyValuePairAtomically(
KeyValuePair<K, V> oldKeyValuePair,
KeyValuePair<K, V> newKeyValuePair ) {
return keyValuePairReference.compareAndSet(
oldKeyValuePair, newKeyValuePair );
}
@Override
public int getIndex() {
return index;
}
@Override
public Bucket<K, V> getChildBucket() {
return childBucketReference.get();
}
@Override
public boolean setChildBucketAtomically(
Bucket<K, V> oldChildBucket, Bucket<K, V> newChildBucket ) {
return childBucketReference.compareAndSet(
oldChildBucket, newChildBucket );
}
@Override
public void setChildBucket( Bucket<K, V> childBucket ) {
childBucketReference.set( childBucket );
}
public String toString() {
StringBuilder sb = new StringBuilder();
KeyValuePair<K, V> keyValuePair = keyValuePairReference.get();
sb.append( keyValuePair == null ? "null keyValuePair" : keyValuePair.toString() );
sb.append( " ==> " );
Bucket<K, V> childBucket = childBucketReference.get();
sb.append( childBucket == null ? "null bucket" : childBucket.toString() );
return sb.toString();
}
}
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
/**
* KeyValuePair encapsulates a Key Value pair.
*
* It is set atomically into BucketImpl
*
* @author John Dickerson - 16 Dec 2022
*
* @param <K> Key of ConcurrentMap
* @param <V> Value of ConcurrentMap
*/
public class KeyValuePair<K, V> {
private final K key;
private final V value;
public KeyValuePair( K key, V value ) {
this.key = key;
this.value = value;
}
public K getKey() {
return this.key;
}
public V getValue() {
return this.value;
}
@Override
public String toString() {
return "KeyValuePair [key=" + key + ", value=" + value + "]";
}
}
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
/**
* This ConcurrentMap does not use synchronized blocks for put(..), getValue(..) and remove(..)
* methods during the major part of its operation. Instead it uses CAS for put and remove.
* <p>
* This ConcurrentMap does however have housekeeping operations which use hard synchronization
* (synchronized blocks) to do cleanups when certain thresholds are reached.
* <p>
* The ConcurrentMap bucket array is pre-populated with BucketImpl instances.
* <p>
* When an entry is being added, the hashCode() of the key is called and the resultant hash has a
* modulus performed on it against the number of buckets in the array:
* <p>
* int hashCode = key.hashCode();<p>
* int arrayIndex = hashCode % bucketSize;
* <p>
* This maps a hashcode to an array index. The problem with this is that more than one hashcode may
* end up being assigned to the same arrayIndex. For example "BB" and "Aa" have the same hashcode
* which would result in them being stored in a chain of Buckets at the same array index.
* <p>
* hashCode = "BB".hashCode() = 2112<p>
* hashCode = "Aa".hashCode() = 2112
* <p>
* If the number of buckets are 33 both "BB" and "Aa" would need to be stored at index 0:
* <p>
* int arrayIndex = 2112 % 33 = 0
* <p>
* If we are reducing bigger hashcode numbers to smaller array index numbers using the modulus,
* then it is very possible that different hashcodes will also produce the same modulus.
* <p>
* To get around this, further buckets can be chained to the bucket already in the arrayIndex.
* Each Bucket has a reference to a child bucket. If a new key translates to an index in the
* bucket array that already has a KeyValuePair in it then a new bucket is created and added as a
* child to the last bucket in the chain.
* <p>
* The method, put(K key, V values) uses CAS functionality to update child bucket references in
* Buckets and to update a Bucket with a KeyValuePair.
* <p>
* If there is already a Bucket in the chain of Buckets connected to a certain array index that has
* a null KeyValuePair then that bucket will be used for a new keyValuePair. If there are no
* null KeyValuePairs in the buckets chained to a certain index then a new Bucket is created at the
* end of the bucket chain and the KeyValuePair set in that new Bucket.
* <p>
* The method, getValue( Key k ) uses CAS to read the child bucket and KeyValuePair
* <p>
* The method, remove( K k ) uses CAS functionality to update the KeyValuePair of a bucket with
* null. The remove method does not delete the Bucket itself for that would require more than a
* simple CAAS operation which would affect performance. What this means is that if objects are
* added to the map which have the same arrayIndex and they are then removed, we could end up
* with buckets in the chain of buckets leading of an index which have no KeyValuePairs. This would
* make get() operations slightly longer as we would needless be traversing elements in a linked
* list which had no values in them.
* <p>
* There is however a configuration parameter called "numberOfKeyValuePairsDeletedBeforeHouseKeeping"
* which on reaching the threshold triggers a house keeping process which removes all chained
* Buckets which have null KeyValuePairs in them. If there is a bucket chain A -> B -> C then
* removing B would result in A being mapped to C: A -> C. During this house keeping exercise
* all calls to put(..) and getValue(..) block until the house keeping is finished.
* <p>
* The advantage of this implementation is that the cost of heavy weight synchronization is reduced
* to occasional house keeping exercises. All other cases of synchronization use CAS which is more
* performant.
* <p>
* @author John Dickerson
*
* @param <K> Key wish to put in the Map
* @param <V> Value wish to put in the Map
*/
/**
* @author John Dickerson - 16 Dec 2022
*/
public interface ConcurrentMap<K, V> {
/**
* This method uses CAS instead of heavy synchronization
*
* @param key Key wish to put in the Map
*
* @param value Value wish to put in the Map
*/
public void put( K key, V value );
/**
* This method does not require synchronization
*
* @param key Key wish to retrieve the value with
*
* @return V
* value
*/
public V getValue( K key );
/**
* This method uses CAS instead of heavy synchronization
*
* @param key
* Key wish to remove the value with
*/
public void remove( K key );
/**
* Debugging the contents of the internal array and its chained buckets
*
* @return a String showing the contents of the internal array and its chained buckets
*/
public String debug();
}
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This ConcurrentMap does not use synchronized blocks for put(..), getValue(..) and remove(..)
* methods during the major part of its operation. Instead it uses CAS for put and remove.
* <p>
* This ConcurrentMap does however have housekeeping operations which use hard synchronization
* (synchronized blocks) to do cleanups when certain thresholds are reached.
* <p>
* The ConcurrentMap bucket array is pre-populated with BucketImpl instances.
* <p>
* When an entry is being added, the hashCode() of the key is called and the resultant hash has a
* modulus performed on it against the number of buckets in the array:
* <p>
* int hashCode = key.hashCode();<p>
* int arrayIndex = hashCode % bucketSize;
* <p>
* This maps a hashcode to an array index. The problem with this is that more than one hashcode may
* end up being assigned to the same arrayIndex. For example "BB" and "Aa" have the same hashcode
* which would result in them being stored in a chain of Buckets at the same array index.
* <p>
* hashCode = "BB".hashCode() = 2112<p>
* hashCode = "Aa".hashCode() = 2112
* <p>
* If the number of buckets are 33 both "BB" and "Aa" would need to be stored at index 0:
* <p>
* int arrayIndex = 2112 % 33 = 0
* <p>
* If we are reducing bigger hashcode numbers to smaller array index numbers using the modulus,
* then it is very possible that different hashcodes will also produce the same modulus.
* <p>
* To get around this, further buckets can be chained to the bucket already in the arrayIndex.
* Each Bucket has a reference to a child bucket. If a new key translates to an index in the bucket
* array that already has a KeyValuePair in it then a new bucket is created and added as a child
* to the last bucket in the chain.
* <p>
* The method, put(K key, V values) uses CAS functionality to update child bucket references in
* Buckets and to update a Bucket with a KeyValuePair.
* <p>
* If there is already a Bucket in the chain of Buckets connected to a certain array index that has
* a null KeyValuePair then that bucket will be used for a new keyValuePair. If there are no null
* KeyValuePairs in the buckets chained to a certain index then a new Bucket is created at the end
* of the bucket chain and the KeyValuePair set in that new Bucket.
* <p>
* The method, getValue( Key k ) uses CAS to read the child bucket and KeyValuePair
* <p>
* The method, remove( K k ) uses CAS functionality to update the KeyValuePair of a bucket with
* null. The remove method does not delete the Bucket itself for that would require more than a
* simple CAAS operation which would affect performance. What this means is that if objects are
* added to the map which have the same arrayIndex and they are then removed, we could end up
* with buckets in the chain of buckets leading of an index which have no KeyValuePairs. This would
* make get() operations slightly longer as we would needless be traversing elements in a linked
* list which had no values in them.
* <p>
* There is however a configuration parameter called "numberOfKeyValuePairsDeletedBeforeHouseKeeping"
* which on reaching the threshold triggers a house keeping process which removes all chained
* Buckets which have null KeyValuePairs in them. If there is a bucket chain A -> B -> C then
* removing B would result in A being mapped to C: A -> C. During this house keeping exercise
* all calls to put(..) and getValue(..) block until the house keeping is finished.
* <p>
* The advantage of this implementation is that the cost of heavy weight synchronization is reduced
* to occasional house keeping exercises. All other cases of synchronization use CAS which is
* more performant.
* <p>
* @author John Dickerson
*
* @param <K> Key wish to put in the Map
* @param <V> Value wish to put in the Map
*/
/**
* @author John Dickerson - 16 Dec 2022
*/
public class ConcurrentMapImpl<K, V> implements ConcurrentMap<K, V> {
// Used for Spring Cleaning
private Queue<Bucket<K, V>> bucketsToDeleteConcurrentLinkedQueue;
private Bucket<K, V>[] buckets;
private int bucketSize;
private int numberOfKeyValuePairsDeletedBeforeHouseKeeping;
private AtomicInteger numberDeleted = new AtomicInteger( 0 );
private Object removeLock;
private AtomicBoolean removeBoolean = new AtomicBoolean( false );
private AtomicBoolean putBoolean = new AtomicBoolean( false );
/**
* Finds last Bucket in the Linked List. Each index of the buckets array is pre-populated with
* a BucketImpl. Each Bucket in the array can have a child Bucket and so can the child Buckets
* have child buckets themselves.
* <p>
* This method finds the last Bucket in the linked list of buckets.
*
* @param bucket
*
* @return last child bucket in chain of buckets
*/
private Bucket<K, V> getLastBucket( Bucket<K, V> bucket ) {
Bucket<K, V> childBucket = bucket.getChildBucket();
if ( childBucket == null ) {
return bucket;
}
else {
return getLastBucket( childBucket );
}
}
/**
* A bucket can optional have a KeyValuePair. If it has no KeyValuePair that means the bucket
* is not being used at the moment.
* <p>
* If the bucket has no KeyValuePair, and it is in one of the child buckets of the bucket
* which is in the array of buckets then that means that the bucket is not being used at the
* moment and may be removed by house keeping if the number of deleted buckets passes the
* threshold defined by numberOfKeyValuePairsDeletedBeforeHouseKeeping
* <p>
* This method looks at the bucket passed in to see if it has a KeyValuePair for the key passed
* in.
* <p>
* If it does not it will recursively look at the child buckets to see if they have a
* KeyValuePair corresponding to the Key.
* <p>
* If a KeyValuePair is found which corresponds to the key, the value for that KeyValuePair is
* returned. If no KeyValuePair is found null is returned.
* <p>
* @param bucket
* Bucket to look for KeyValuePairs recursively until one is found which matches the key
*
* @param key
* the key to macth KeyValuePairs with
*
* @return value of matched KeyValuePair
*/
private V getValue( Bucket<K, V> bucket, K key ) {
KeyValuePair<K, V> keyValuePair = bucket.getKeyValuePair();
if ( keyValuePair != null ) {
if ( keyValuePair.getKey().equals( key ) ) {
return keyValuePair.getValue();
}
}
Bucket<K, V> childBucket = bucket.getChildBucket();
if ( childBucket != null ) {
return getValue( childBucket, key );
}
else {
return null;
}
}
/**
* BucketImpl has a getChildBucket() method which can reference a child BucketImpl.
* <p>
* Consequently Buckets can be chained. The reason buckets are chained is that if one or more
* keys have the same hashcode or one or mode keys have the same modulus of the hashcode then
* they will be assigned to the same index in the array. Obviously only one value can be
* assigned to one slot in the array so to get around this we allow the BucketImpls to be
* chained.
* <p>
* Searching through chained BucketImpls is slower than going directly to the array index, but
* if there are not too many of them then it does not impact performance significantly.
* <p>
* This method will look in the bucket and recursively at its children to find a bucket which
* has a KeyValuePair whose Key matches the key of this method.
*
* @param bucket
* The parent bucket to look for the key in itself or its children
*
* @param key
* the key to look for in the buckets
*
* @return The Bucket who has a KeyValuePair which matches the key of this method
*/
private Bucket<K, V> getBucket( Bucket<K, V> bucket, K key ) {
KeyValuePair<K, V> keyValuePair = bucket.getKeyValuePair();
if ( keyValuePair != null ) {
if ( keyValuePair.getKey().equals( key ) ) {
return bucket;
}
}
Bucket<K, V> childBucket = bucket.getChildBucket();
if ( childBucket != null ) {
return getBucket( childBucket, key );
}
else {
return null;
}
}
/**
* This method looks at the buckets children recursively to find the first child bucket which
* has a null KeyValuePair. This Bucket can then be reused instead of creating a new one and
* adding it on the end of the chain.
*
* @param bucket
* Parent Bucket to look for the first child recursively that has a null KeyValuePair
*
* @return Return first child Bucket that has a null KeyValuePair
*/
private Bucket<K, V> getFreeChildBucket( Bucket<K, V> bucket ) {
Bucket<K, V> childBucket = bucket.getChildBucket();
if ( childBucket != null ) {
if ( childBucket.getKeyValuePair() == null ) {
return childBucket;
}
else {
return getFreeChildBucket( childBucket );
}
}
return null;
}
/**
* If more buckets are deleted then the value denoted by
* numberOfKeyValuePairsDeletedBeforeHouseKeeping then a remove will trigger a spring cleaning
* exercise to tidy up chains of Buckets who have had an element deleted in them.
* <p>
* The reason this cleanup is necessary is that calling remove on a key nullifies the
* KeyWordPair of the bucket in question without deleting the bucket itself. The reason the
* bucket is not deleted itself is that deleting the bucket itself could necessitate non
* CAS hard synchronization as several bucket references could need to be updated in the chain
* in one atomic transaction.
* <p>
* Imagine the chain array:
* <p>
* Index 0 : Aa:Apple ==> [null keyValuePair] ==> BB:Bat ==> null bucket
* <p>
* Deleting the bucket denoted [null keyValuePair] would involve changing Aa:Apple so it
* references BB:Bat. If another thread removed BB:Bat and another thread added a new bucket
* to [null keyValuePair] it would be missed from the chain. This is why hard synchronization
* would be required to block any threads putting or getting while a remove was taking place.
* <p>
* To involve the expensive hard sychronization on the remove we instead just nullify the
* KeyValuePair and add a reference to the nullified bucket in a
* bucketsToDeleteConcurrentLinkedQueue so that later on we can block all threads while we do a
* proper clean up.
* <p>
* The principle is not too dissimilar to garbage collection
*/
private void springClean() {
Bucket<K, V> bucketToRemove;
while ( ( bucketToRemove = bucketsToDeleteConcurrentLinkedQueue.poll() ) != null ) {
if ( bucketToRemove.getKeyValuePair() == null ) {
removeBucket( bucketToRemove );
}
}
}
/**
* This method is called by springClean(). It called when put and get methods are blocked
* by springClean(). springClean() does not occur on each removal of a key and value from the
* ConcurrentMap. The springClean() occurs after a certain number of removes have been called
* denoted by numberOfKeyValuePairsDeletedBeforeHouseKeeping.
* <p>
* The springClean() means that put, get and remove methods all occur without hard core
* synchronization most of the time. Instead CAS is used ensure state changes are visible
* across all threads.
*
* @param bucketToRemove
*/
private void removeBucket( Bucket<K, V> bucketToRemove ) {
Bucket<K, V> parentBucket = buckets[bucketToRemove.getIndex()];
if ( parentBucket != null ) {
if ( parentBucket.getKeyValuePair() == null ) {
Bucket<K, V> nextBucket = parentBucket.getChildBucket();
if ( nextBucket != null ) {
buckets[parentBucket.getIndex()] = nextBucket;
}
}
}
while ( parentBucket != null ) {
Bucket<K, V> nextBucket = parentBucket.getChildBucket();
if ( nextBucket != null ) {
if ( nextBucket.getKeyValuePair() == null ) {
Bucket<K, V> afterNextBucket = nextBucket.getChildBucket();
parentBucket.setChildBucket( afterNextBucket );
}
}
parentBucket = nextBucket;
}
}
/**
* Constructor
*
* @param bucketSize
* The number of slots in this Map. Note that if we try and place more items in the map
* then there are slots chaining will for sure occur which can degrade performance if
* there is significant chaining. Best using a prime number for the bucket size to
* minimise the amount of chaining.
*
* @param numberOfKeyValuePairsDeletedBeforeHouseKeeping
* This threshold determines how many KeyValuePairs can be deleted before houseKeeping is
* triggered. House Keeping removes child buckets which have had their KeyValuePairs
* nullified.
*/
@SuppressWarnings( "unchecked" )
public ConcurrentMapImpl(
int bucketSize, int numberOfKeyValuePairsDeletedBeforeHouseKeeping ) {
this.bucketSize = bucketSize;
this.numberOfKeyValuePairsDeletedBeforeHouseKeeping =
numberOfKeyValuePairsDeletedBeforeHouseKeeping;
buckets = new Bucket[bucketSize];
bucketsToDeleteConcurrentLinkedQueue = new ConcurrentLinkedQueue<Bucket<K, V>>();
for ( int i = 0; i < bucketSize; i++ ) {
buckets[i] = new BucketImpl<K, V>( i );
}
}
/**
* Constructor
*/
public ConcurrentMapImpl() {
this( 33, 10 );
}
private void putValue( K key, V value ) {
putBoolean.set( true );
int hashCode = key.hashCode();
int arrayIndex = hashCode % bucketSize;
while ( true ) {
KeyValuePair<K, V> keyValuePair = buckets[arrayIndex].getKeyValuePair();
// If the parent bucket in the index has a Null KeyValuePair then set the new
// KeyValuePair in it
if ( keyValuePair == null ) {
if ( buckets[arrayIndex].setKeyValuePairAtomically(
keyValuePair, new KeyValuePair<K, V>( key, value ) ) ) {
break;
}
}
else {
// Navigate down the chain of buckets starting at the child of the parent bucket
// at the index and return the first bucket that has a null KeyValuePair. If
// a Bucket is returned that means we are reusing an old bucket.
Bucket<K, V> freeBucket = getFreeChildBucket( buckets[arrayIndex] );
if ( freeBucket != null ) {
if ( freeBucket.setKeyValuePairAtomically(
null, new KeyValuePair<K, V>( key, value ) ) ) {
numberDeleted.getAndDecrement();
break;
}
}
else {
// We did not find any unused buckets so we get the last bucket in the chain
// and add a new bucket to it.
Bucket<K, V> lastBucket = getLastBucket( buckets[arrayIndex] );
if ( lastBucket.setChildBucketAtomically(
null, new BucketImpl<K, V>(
key, value, arrayIndex ) ) ) {
break;
}
}
}
}
putBoolean.set( false );
}
@Override
public void put( K key, V value ) {
while ( removeBoolean.get() ) {
synchronized ( removeLock ) {
try {
removeLock.wait();
putValue( key, value );
}
catch ( InterruptedException e ) {
// do nothing
}
}
}
putValue( key, value );
}
@Override
public V getValue( K key ) {
while ( true ) {
while ( removeBoolean.get() ) {
synchronized ( removeLock ) {
try {
this.wait();
break;
}
catch ( InterruptedException e ) {
}
}
}
int hashCode = key.hashCode();
int arrayIndex = hashCode % bucketSize;
V v = getValue( buckets[arrayIndex], key );
if ( !removeBoolean.get() ) {
return v;
}
}
}
@Override
public void remove( K key ) {
int hashCode = key.hashCode();
int arrayIndex = hashCode % bucketSize;
while ( true ) {
Bucket<K, V> bucket = getBucket( buckets[arrayIndex], key );
if ( bucket == null ) {
return;
}
if ( bucket.setKeyValuePairAtomically( bucket.getKeyValuePair(), null ) ) {
bucketsToDeleteConcurrentLinkedQueue.add( bucket );
break;
}
}
if ( numberDeleted.getAndIncrement()
+ 2 > numberOfKeyValuePairsDeletedBeforeHouseKeeping ) {
synchronized ( this ) {
if ( numberDeleted.getAndIncrement()
+ 2 > numberOfKeyValuePairsDeletedBeforeHouseKeeping ) {
try {
removeBoolean.getAndSet( true );
springClean();
numberDeleted.set( 0 );
}
finally {
removeBoolean.getAndSet( false );
}
}
}
}
}
@Override
public String debug() {
StringBuilder sb = new StringBuilder( "\n" );
for ( int i = 0; i < buckets.length; i++ ) {
sb.append( "Index " ).append( i ).append( " : " );
sb.append( buckets[i].toString() ).append( "\n" );
}
return sb.toString();
}
}
package com.javaspeak.java_examples.concurrency.custom.map.concurrent;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* @author John Dickerson - 16 Dec 2022
*/
public class ConcurrentMapTest {
private static final String EL = "\n";
private ConcurrentMap<Long, String> concurrentMap;
@BeforeClass
private void setUp() {
int bucketSize = 4;
int numberOfKeyValuePairsDeletedBeforeHouseKeeping = 2;
concurrentMap =
new ConcurrentMapImpl<Long, String>(
bucketSize,
numberOfKeyValuePairsDeletedBeforeHouseKeeping );
}
@Test
public void putOnDifferentIndexTest() {
concurrentMap.put( 0l, "aa" );
concurrentMap.put( 1l, "bb" );
String expected =
EL +
"Index 0 : KeyValuePair [key=0, value=aa] ==> null bucket" + EL +
"Index 1 : KeyValuePair [key=1, value=bb] ==> null bucket" + EL +
"Index 2 : null keyValuePair ==> null bucket" + EL +
"Index 3 : null keyValuePair ==> null bucket" + EL;
String debug = concurrentMap.debug();
System.out.println( debug );
Assert.assertEquals( debug, expected );
Assert.assertEquals( concurrentMap.getValue( 0l ), "aa" );
Assert.assertEquals( concurrentMap.getValue( 1l ), "bb" );
}
@Test
public void putOnSameIndexTest() {
concurrentMap.put( 0l, "aa" );
concurrentMap.put( 4l, "bb" );
String expected =
EL +
"Index 0 : KeyValuePair [key=0, value=aa] ==> KeyValuePair [key=4, value=bb] ==> null bucket" + EL +
"Index 1 : null keyValuePair ==> null bucket" + EL +
"Index 2 : null keyValuePair ==> null bucket" + EL +
"Index 3 : null keyValuePair ==> null bucket" + EL;
String debug = concurrentMap.debug();
System.out.println( debug );
Assert.assertEquals( debug, expected );
Assert.assertEquals( concurrentMap.getValue( 0l ), "aa" );
Assert.assertEquals( concurrentMap.getValue( 4l ), "bb" );
}
@Test
public void putOnSameIndexAndDeleteFirstTest() {
concurrentMap.put( 0l, "aa" );
concurrentMap.put( 4l, "bb" );
concurrentMap.remove( 0l );
String expected =
EL +
"Index 0 : null keyValuePair ==> KeyValuePair [key=4, value=bb] ==> null bucket"
+ EL +
"Index 1 : null keyValuePair ==> null bucket" + EL +
"Index 2 : null keyValuePair ==> null bucket" + EL +
"Index 3 : null keyValuePair ==> null bucket" + EL;
String debug = concurrentMap.debug();
System.out.println( debug );
Assert.assertEquals( debug, expected );
Assert.assertNull( concurrentMap.getValue( 0l ) );
Assert.assertEquals( concurrentMap.getValue( 4l ), "bb" );
}
@Test
public void triggerSpringCleanTest() {
concurrentMap.put( 0l, "aa" );
concurrentMap.put( 4l, "bb" );
concurrentMap.remove( 0l );
concurrentMap.remove( 4l );
String expected =
EL +
"Index 0 : null keyValuePair ==> null bucket" + EL +
"Index 1 : null keyValuePair ==> null bucket" + EL +
"Index 2 : null keyValuePair ==> null bucket" + EL +
"Index 3 : null keyValuePair ==> null bucket" + EL;
String debug = concurrentMap.debug();
System.out.println( debug );
Assert.assertEquals( debug, expected );
Assert.assertNull( concurrentMap.getValue( 0l ) );
Assert.assertNull( concurrentMap.getValue( 4l ) );
}
public static void main( String[] args ) {
TestListenerAdapter tla = new TestListenerAdapter();
TestNG testng = new TestNG();
testng.setTestClasses( new Class[] { ConcurrentMapTest.class } );
testng.addListener( tla );
testng.run();
}
}
Back: Custom Data Structures | Concurrency
Page Author: JD