trait ProducerF[F[_], E, A] extends Serializable
A simple interface that models the producer side of a producer-consumer communication channel.
In a producer-consumer communication channel we've got these concerns to take care of:
- back-pressure, which is handled automatically via this interface
- halting the channel with a final event and informing all current and future consumers about it, while stopping future producers from pushing more events
The ProducerF interface takes care of these concerns via:
- the
F[Boolean]result, which should returntruefor as long as the channel wasn't halted, so further events can be pushed; these tasks also block (asynchronously) when internal buffers are full, so back-pressure concerns are handled automatically - halt, being able to close the channel with a final event that will be visible to all current and future consumers
Currently implemented by ConcurrentChannel.
- Source
- ProducerF.scala
- Alphabetic
- By Inheritance
- ProducerF
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def awaitConsumers(n: Int): F[Boolean]
Awaits for the specified number of consumers to be connected.
Awaits for the specified number of consumers to be connected.
This is an utility to ensure that a certain number of consumers are connected before we start emitting events.
- n
is a number indicating the number of consumers that need to be connected before the returned task completes
- returns
a task that will complete only after the required number of consumers are observed as being connected to the channel
- abstract def halt(e: E): F[Unit]
Closes the communication channel with a message that will be visible to all current and future consumers.
Closes the communication channel with a message that will be visible to all current and future consumers.
Note that if multiple
haltevents happen, then only the first one will be taken into account, all otherhaltmessages are ignored. - abstract def push(a: A): F[Boolean]
Publishes an event on the channel.
Publishes an event on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space for pushing the message, or until the channel was halted, whichever comes first
- returns
truein case the message was pushed in the internal buffer orfalsein case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Example:
import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { if (from + 1 < until) channel.push(from).flatMap { case true => // keep going range(channel, from + 1, until) case false => // channel was halted by another producer loop, so stopping F.unit } else // we are done, publish the final event channel.halt(from + 1).as(()) } else { F.unit // invalid range } }
- a
is the message to publish
- returns
truein case the message was published successfully, orfalsein case the channel was halted by another producer
- abstract def pushMany(seq: Iterable[A]): F[Boolean]
Publishes multiple events on the channel.
Publishes multiple events on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space and the whole sequence was published, or until the channel was halted, in which case no further messages are allowed for being pushed
- returns
truein case the whole sequence was pushed in the internal b orfalsein case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Note: implementations may try to push events one by one. This is not an atomic operation. In case of concurrent producers, there's absolutely no guarantee for the order of messages coming from multiple producers. Also in case the channel is halted (and the resulting task returns
false), the outcome can be that the sequence gets published partially.import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { val to = until - 1 channel.pushMany(Range(from, to)).flatMap { case true => channel.halt(to).as(()) // final event case false => // channel was halted by a concurrent producer, so stop F.unit } } else { F.unit // invalid range } }
- seq
is the sequence of messages to publish on the channel
- returns
truein case the message was published successfully, orfalsein case the channel was halted by another producer

This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomictypes, as alternative tojava.util.concurrent.atomicmonix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observablepattern:Observableimplementationsmonix.tail exposes Iterant for purely functional pull based streaming:
BatchandBatchCursor, the alternatives to Scala'sIterableandIteratorrespectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.