Cancelable
Older versions: 2.x
A one-time idempotent action that can be used to cancel async
computations, or to release resources that active data-sources are
holding, it is the equivalent of java.io.Closable
, but without the
I/O focus, or to IDisposable
from .NET, or to
akka.actor.Cancellable
(but with an l
dropped :)).
Base Cancelable #
package monix.execution
trait Cancelable {
def cancel(): Unit
}
The contract for well-behaved Cancelable
instances:
cancel
is idempotent, calling it multiple times has the same effect as calling it oncecancel
is thread-safe, otherwise you cannot guarantee idempotency
In order to quickly build Cancelable
instances:
import monix.execution.Cancelable
// A cancelable that doesn't do anything on cancel
val c1 = Cancelable.empty
// Same thing, a cancelable that doesn't do anything
val c2 = Cancelable()
// Specify a callback to be called on cancel
val c3 = Cancelable(() => println("Canceled!"))
BooleanCancelable #
The BooleanCancelable
represents a cancelable that can be queried
for its canceled status, adding the necessary isCanceled
query for
when we need it.
See the API Documentation.
package monix.execution.cancelables
trait BooleanCancelable extends Cancelable {
def isCanceled: Boolean
}
To have a reusable (immutable) BooleanCancelable
instance that’s
already canceled:
import monix.execution.cancelables._
// Building an instance that's already canceled
val c = BooleanCancelable.alreadyCanceled
// Doesn't do anything
c.cancel()
// Always returns true
c.isCanceled
To build an instance without a callback, but that can be
used to check for isCanceled
:
val c = BooleanCancelable()
// c: BooleanCancelable = monix.execution.cancelables.BooleanCancelable$$anon$1@3478e7c2
c.isCanceled
// res4: Boolean = false
c.cancel()
c.isCanceled
// res6: Boolean = true
To build an instance out of a callback that behaves like you’d expect it to:
val c = BooleanCancelable(() => println("Effect!"))
CompositeCancelable #
The CompositeCancelable
is an aggregate of cancelable
references (to which you can add new references or remove existing ones)
and that are handled in aggregate when doing a cancel()
.
See the API Documentation.
The contract for CompositeCancelable
:
- adding and removing cancelables from the composite is thread-safe
- if the composite was already canceled, then adding new references to it will trigger their cancelation
- upon cancelation all references are released
Usage:
import monix.execution.Cancelable
val c = CompositeCancelable()
c += Cancelable(() => println("Canceled #1"))
c += Cancelable(() => println("Canceled #2"))
c += Cancelable(() => println("Canceled #3"))
// Cancelling will trigger all 3 of them
c.cancel()
// Appending a new cancelable to it after cancel
// will trigger its cancelation immediately
c += Cancelable(() => println("Canceled #4"))
// => Canceled #4
We can add cancelables references to our composite, but we can also remove them, maybe because they are no longer relevant, for GC purposes, etc:
val composite = CompositeCancelable()
val c1 = Cancelable(() => println("Canceled #1"))
val c2 = Cancelable(() => println("Canceled #2"))
composite += c1
composite += c2
// Removing from our composite
composite -= c1
// Canceling will now only cancel c2
composite.cancel()
// => Canceled #2
MultiAssignCancelable #
The MultiAssignCancelable
is a cancelable that behaves like a
variable, referencing another cancelable reference that can be
swapped as needed.
See the API Documentation.
Contract:
- assignment is thread-safe
- cancelation will trigger the cancelation of the underlying cancelable and releasing it in order to be free for GC
- if our assignable cancelable was canceled, then upon subsequent assignments, the references will be canceled immediately
Usage:
val multiAssign = MultiAssignCancelable()
val c1 = Cancelable(() => println("Canceled #1"))
multiAssign := c1
val c2 = Cancelable(() => println("Canceled #2"))
multiAssign := c2
// Canceling it will only cancel the last assignee
multiAssign.cancel()
// => Cancelled #2
// Subsequent assignments are canceled immediately
val c3 = Cancelable(() => println("Canceled #3"))
multiAssign := c3
// => Canceled #3
OrderedCancelable #
In a multi-threading environment sometimes we cannot guarantee an
ordering on assignment and obviously ordering is important.
OrderedCancelable
is similar to MultiAssignCancelable
with an
extra ability to do orderedUpdate
. There’s a second assignment
operation that takes an order
numeric argument and
in case the update was made with an order
that’s strictly bigger
than the current one you’re trying to make, then the assignment gets
ignored, so:
// Let's simulate a race condition
import monix.execution.Scheduler.{global => scheduler}
val c = OrderedCancelable()
scheduler.execute(new Runnable {
def run(): Unit =
c.orderedUpdate(
Cancelable(() => println("Number #2")),
order = 2)
})
c.orderedUpdate(
Cancelable(() => println("Number #1")),
order = 1)
In the example above, there is no happens-before relationship between
the two orderedUpdate
attempts, so we have non-determinism, because
a parallel thread might be faster than our current one and trigger the
number 2 update before number 1. But this will leave us with a result
that we might not want, because then the last update would be
number 1. So here is where orderedUpdate
comes in handy. We
explicitly specify an order
argument and thus force an ordering to
it.
The example above is obvious, right? But the following one isn’t and
it’s in fact a pretty common pattern. Let’s build a function that
executes things with a delay, tasks that can be canceled. To
add a delay, we’d use a Scheduler
and we want to return
a Cancelable
that can cancel either the delay or the result
of our passed function argument, like:
// INCORRECT EXAMPLE
import concurrent.duration._
import monix.execution._
def delayedExecution(cb: () => Cancelable)
(implicit s: Scheduler): Cancelable = {
val ref = OrderedCancelable()
ref := s.scheduleOnce(5.seconds) {
ref := cb()
}
ref
}
You may not notice it, but this is a race condition that can
yield non-deterministic behavior. Lets say the garbage collector
has problems and freezes the whole process for 5 whole seconds.
This can mean that ref := cb()
might execute before the result
of s.scheduleOnce
returns, as the call to :=
does not have
a happens-before relationship with the actual delayed scheduling.
Which means the cancelable returned by our function will be incorrect.
Lets fix it:
import concurrent.duration._
import monix.execution._
def delayedExecution(cb: () => Cancelable)
(implicit s: Scheduler): Cancelable = {
val ref = OrderedCancelable()
val delay = s.scheduleOnce(5.seconds) {
ref.orderedUpdate(cb(), 2)
}
// This should be the first update, but
// if not, then it is ignored!
ref.orderedUpdate(delay, 1)
ref
}
SingleAssignCancelable #
The SingleAssignCancelable
is similar to the
OrderedCancelable
, except that it can be assigned once and
only once.
See the API Documentation.
The contract:
- it is thread-safe
- if canceled after assignment, the underlying cancelable gets canceled and the reference released for GC purposes
- if canceled while empty, then the assigned cancelable will be canceled immediately on assignment
- if assignment happens a second time, then the operation
will throw an
IllegalStateException
, so don’t do that
It is useful in cases you need a forward reference, like:
val ref = SingleAssignCancelable()
ref := scheduler.scheduleAtFixedRate(0.seconds, 5.seconds) {
// This wouldn't be correct without having an already
// initialized value, as it would be a forward reference
// (e.g. a reference used before it's initialized), which
// could lead to a NullPointerException, not cool!
ref.cancel()
}
It is also possible to specify at construction time an extra cancelable reference to cancel, in addition to the assigned reference:
val c = {
val guest = Cancelable(() => println("extra canceled"))
SingleAssignCancelable.plusOne(guest)
}
c := Cancelable(() => println("primary canceled"))
c.cancel()
//=> extra canceled
//=> primary canceled
You can use OrderedCancelable
for the same purpose of
course, but the implementation of SingleAssignCancelable
is more
efficient (e.g. using getAndSet
, cheaper than compareAndSet
) and
the IllegalStateException
is nice when dealing with concurrent code
that isn’t doing what it’s supposed to do.
SerialCancelable #
The SerialCancelable
is also similar to OrderedCancelable
,
being a cancelable whose underlying reference can be swapped by another
cancelable, causing the previous cancelable to be canceled on assignment.
See the API Documentation.
Contract:
- assignment is thread-safe
- cancelation will trigger the cancelation of the underlying cancelable and releasing it in order to be free for GC
- if our assignable cancelable was canceled, then upon subsequent assignments, the references will be canceled immediately
- an assignment of a new cancelable will cause the previously stored cancelable to be canceled
Usage:
val ref = SerialCancelable()
ref := Cancelable(() => println("Canceled #1"))
// Will cancel the previous one
ref := Cancelable(() => println("Canceled #2"))
// => Canceled #1
// Will cancel the previous one
ref := Cancelable(() => println("Canceled #3"))
// => Canceled #2
// Will cancel the current one
ref.cancel()
// => Canceled #3
// Afterwards everything gets canceled on assignment
ref := Cancelable(() => println("Canceled #4"))
// => Canceled #4
RefCountCancelable #
Represents a Cancelable
that can create dependent cancelable objects
and that only executes the canceling logic when all dependent
cancelable objects have been canceled.
Contract:
- thread-safe
- if we create a total number of
N
children cancelables withacquire()
, in order for cancelation to occur we need to cancel allN
cancelables, in addition to triggering a cancelation on the parent - the ordering of cancelation doesn’t matter (e.g. parent first, children next, or vice-versa)
- on creating cancelables with
acquire()
, if our parent is already canceled, then the returned reference will be aCancelable.empty
Sample:
val refs = RefCountCancelable { () =>
println("Everything was canceled")
}
// acquiring two cancelable references
val ref1 = refs.acquire()
val ref2 = refs.acquire()
// Starting the cancelation process
refs.cancel()
// This is now true, but the callback hasn't been invoked yet
refs.isCanceled
// res: Boolean = true
// After our RefCountCancelable was canceled, this will return
// a Cancelable.empty
val ref3 = refs.acquire()
ref3 == Cancelable.empty
// res: Boolean = true
// Release reference 1, nothing happens here
ref1.cancel()
// Release reference 2, now we can execute
ref2.cancel()
// => Everything was canceled
This cancelable type is actually used in Observable.merge
(•‿•)