Atomic
Older versions: 2.x
Scala is awesome at handling concurrency and parallelism, providing high-level tools for handling it, however sometimes you need to go lower level. Java’s library provides all the multi-threading primitives required, however the interfaces of these primitives sometime leave something to be desired.
One such example are the atomic references provided in java.util.concurrent.atomic package. This project is an attempt at improving these types for daily usage.
Providing a Common interface #
So you have
j.u.c.a.AtomicReference<V>,
j.u.c.a.AtomicInteger,
j.u.c.a.AtomicLong
and
j.u.c.a.AtomicBoolean.
The reason why AtomicReference<V>
does not suffice is because
compare-and-set works with reference equality, not structural equality
like it happens with primitives. So you cannot simply box an integer
and use it safely, plus you’ve got the whole boxing/unboxing overhead.
One problem is that all of these classes do not share a common interface and there’s no reason for why they shouldn’t.
import monix.execution.atomic._
val refInt1: Atomic[Int] = Atomic(0)
val refInt2: AtomicInt = Atomic(0)
val refLong1: Atomic[Long] = Atomic(0L)
val refLong2: AtomicLong = Atomic(0L)
val refString1: Atomic[String] = Atomic("hello")
val refString2: AtomicAny[String] = Atomic("hello")
Working with Numbers #
One really common use-case for atomic references are for numbers to
which you need to add or subtract. To this purpose
j.u.c.a.AtomicInteger
and j.u.c.a.AtomicLong
have an
incrementAndGet
helper. However Ints and Longs aren’t the only types
you normally need. How about Float
and Double
and Short
? How about
BigDecimal
and BigInt
?
In Scala, thanks to the Numeric[T] type-class, we can do this:
val ref = Atomic(BigInt(1))
// ref: AtomicNumberAny[BigInt] = monix.execution.atomic.AtomicNumberAny@511e8b81
// now we can increment a BigInt
ref.incrementAndGet()
// res0: BigInt = 2
// or adding to it another value
ref.addAndGet(BigInt("329084291234234"))
// res1: BigInt = 329084291234236
But then if we have a type that isn’t a number:
val string = Atomic("hello")
Trying to apply numeric operations will of course fail:
string.incrementAndGet()
// error: value incrementAndGet is not a member of monix.execution.atomic.AtomicAny[String]
Support for Other Primitives (Float, Double, Short, Char, Byte) #
Here’s a common gotcha with Java’s AtomicReference<V>
. Suppose
we’ve got this Java atomic:
import java.util.concurrent.atomic.AtomicReference
val ref = new AtomicReference(0.0)
The unexpected happens on compareAndSet
:
val isSuccess = ref.compareAndSet(0.0, 100.0)
// isSuccess: Boolean = false
Calling compareAndSet
fails because when using AtomicReference<V>
the equality comparison is done by reference and it doesn’t work for
primitives because the process of
Autoboxing/Unboxing
is involved. And then there’s the efficiency issue. By using an
AtomicReference, you’ll end up with extra boxing/unboxing going on.
Float
can be stored inside an AtomicInteger
by using Java’s
Float.floatToIntBits
and Float.intBitstoFloat
. Double
can be
stored inside an AtomicLong
by using Java’s
Double.doubleToLongBits
and Double.longBitsToDouble
. Char
,
Byte
and Short
can be stored inside an AtomicInteger
as well,
with special care to handle overflows correctly. All this is done to avoid boxing
for performance reasons.
val ref = Atomic(0.0)
// ref: AtomicDouble = monix.execution.atomic.AtomicDouble@75c1848a
ref.compareAndSet(0.0, 100.0)
// res3: Boolean = true
ref.incrementAndGet()
// res4: Double = 101.0
val ref2 = Atomic('a')
// ref2: AtomicChar = monix.execution.atomic.AtomicChar@2298f87c
ref2.incrementAndGet()
// res5: Char = 'b'
ref2.incrementAndGet()
// res6: Char = 'c'
Common Pattern: Loops for Transforming the Value #
incrementAndGet
represents just one use-case of a simple and more
general pattern. To push items in a queue for example, one would
normally do something like this in Java:
import collection.immutable.Queue
import java.util.concurrent.atomic.AtomicReference
def pushElementAndGet[T <: AnyRef, U <: T]
(ref: AtomicReference[Queue[T]], elem: U): Queue[T] = {
var continue = true
var update = null
while (continue) {
var current: Queue[T] = ref.get()
var update = current.enqueue(elem)
continue = !ref.compareAndSet(current, update)
}
update
}
This is such a common pattern. Taking a page from the wonderful
ScalaSTM,
with Atomic
you can simply do this:
val ref = Atomic(Queue.empty[String])
// ref: AtomicAny[Queue[String]] = monix.execution.atomic.AtomicAny@5b69bde0
// Transforms the value and returns the update
ref.transformAndGet(_.enqueue("hello"))
// res7: Queue[String] = Queue("hello")
// Transforms the value and returns the current one
ref.getAndTransform(_.enqueue("world"))
// res8: Queue[String] = Queue("hello")
// We can be specific about what we want extracted as a result
ref.transformAndExtract { current =>
val (result, update) = current.dequeue
(result, update)
}
// res9: String = "hello"
// Or the shortcut, because it looks so good
ref.transformAndExtract(_.dequeue)
// res10: String = "world"
Voilà, you now have a concurrent, thread-safe and non-blocking Queue. You can do this for whatever persistent data-structure you want.
NOTE: the transform methods are implemented using Scala macros, so you get zero overhead by using them.
Scala.js support for targeting Javascript #
These atomic references are also cross-compiled to Scala.js for targeting Javascript engines, because:
- it’s a useful way of boxing mutable variables, in case you need to box
- it’s a building block for doing synchronization, so useful for code that you want cross-compiled
- because mutability doesn’t take time into account and
compareAndSet
does, atomic references andcompareAndSet
in particular is also useful in a non-multi-threaded / asynchronous environment
Efficiency #
Atomic references are low-level primitives for concurrency and because of that any extra overhead is unacceptable.
Boxing / Unboxing #
Working with a common Atomic[T]
interface implies boxing/unboxing of
primitives. This is why the constructor for atomic references always
returns the most specialized version, as to avoid boxing and unboxing:
val ref1 = Atomic(1)
// ref1: AtomicInt = AtomicInt(1)
val ref2 = Atomic(1L)
// ref2: AtomicLong = AtomicLong(1)
val ref3 = Atomic(true)
// ref3: AtomicBoolean = AtomicBoolean(true)
val ref4 = Atomic("")
// ref4: AtomicAny[String] = monix.execution.atomic.AtomicAny@5e61eeb5
Increments/decrements are done by going through the
Numeric[T]
provided implicit, but only for AnyRef
types, such as BigInt
and
BigDecimal
. For Scala’s primitives the logic has been optimized to
bypass Numeric[T]
.
Cache-padded versions for avoiding the false sharing problem #
In order to reduce cache contention, cache-padded versions for all Atomic classes are provided. For reference on what that means, see:
- mail.openjdk.java.net/pipermail/hotspot-dev/2012-November/007309.html
- JEP 142: Reduce Cache Contention on Specified Fields
To use the cache-padded versions, you need to override the default
PaddingStrategy
:
import monix.execution.atomic.PaddingStrategy.{Left64, LeftRight256}
// Applies padding to the left of the value for a cache line
// of 64 bytes
val ref1 = Atomic.withPadding(1, Left64)
// Applies padding both to the left and the right of the value
// for a total object size of at least 256 bytes
val ref2 = Atomic.withPadding(1, LeftRight256)
The strategies available are:
NoPadding
: doesn’t apply any padding, the defaultLeft64
: applies padding to the left of the value, for a cache line of 64 bytesRight64
: applies padding to the right of the value, for a cache line of 64 bytesLeftRight128
: applies padding to both the left and the right, for a cache line of 128 bytesLeft128
: applies padding to the left of the value, for a cache line of 128 bytesRight128
: applies padding to the right of the value, for a cache line of 128 bytesLeftRight256
: applies padding to both the left and the right, for a cache line of 256 bytes
And now you can join the folks that have mechanical sympathy :-P
Platform Intrinsics #
Java 8 came with platform intrinsics, such that:
- Issue
JDK-7023898
changed the
getAndAdd
method inUnsafe
and all related methods in theAtomicInt
andAtomicLong
implementations, likegetAndIncrement
andincrementAndGet
, to be translated toLOCK XADD
instructions on x86/x64 platforms, being far cheaper than CAS loops based onLOCK CMPXCHG
(normalcompareAndSet
) - Issue
JDK-8004330
changed the
getAndSet
inUnsafe
and all atomic implementations to be translated toLOCK XCHG
, which is also cheaper than CAS loops based onLOCK CMPXCHG
(normalcompareAndSet
). See this article by Dave Dice for why this is awesome
Monix’s Atomic
implementations are also using the same platform
intrinsics when running on top of Java 8, but automatically fallback
to normal compareAndSet
loops if running on top of Java 6 or 7.
So when you do this:
val numberRef = Atomic(0)
// numberRef: AtomicInt = AtomicInt(2)
val previous = numberRef.getAndSet(1)
// previous: Int = 0
val current = numberRef.incrementAndGet()
// current: Int = 2
This code, depending on the Java version used will either use optimized CPU instructions (Java 8 and above) or fallback to CAS loops (e.g. Java 6 and 7, Android).