EventState

An EventState is a common abstraction to help you manage best practices for dealing with event-sourced state. It can only be created with an initial value, and optionally a stream of events to “rehydrate” it by folding over them.

Example for only initial state:

import cats.effect._
import dev.rpeters.fs2.es.EventState

val initialEventState = for {
  es <- EventState[IO].initial[Int, Int](1)(_ + _)
  _ <- es.doNext(1)
  result <- es.get
} yield result
initialEventState.unsafeRunSync()
// res0: Int = 2

Example for rehydrating state:

import fs2.Stream

val eventState = EventState[IO].initial[Int, Int](1)(_ + _)
val hydratedState: IO[Int] = for {
  es <- eventState
  _ <- es.hydrate(Stream.emit(1))
  result <- es.get
} yield result
hydratedState.unsafeRunSync()
// res1: Int = 2

The only way to change a value in an EventState is to supply it manually to doNext or supply a hydrating stream of events. In this way, an EventState is basically just a small wrapper around a cats.effect.concurrent.Ref that enforces an event-based access pattern.

You can also “hook up” a stream of events to an EventState to hydrate it and get a stream of the resulting states back:

val hookedUpStream = EventState[IO].initial[Int, Int](1)(_ + _).flatMap { es =>
  Stream(1, 1, 1).through(es.hookup).compile.toList
}
hookedUpStream.unsafeRunSync()
// res2: List[Int] = List(2, 3, 4)

When using hookup, if you only have a single event stream going into your EventState then the resulting stream is guaranteed to have all possible state changes.

SignallingEventState

FS2 has some useful concurrency primitives in the form of SignallingRefs and Topics, among others. SignallingRef is useful when you want to use a variable as a signal where you continuously want the latest state change. SignallingEventState is similar in that it has properties just like a SignallingRef but it also allows you to continuously stream state changes from it. In particular, the following two methods are available:

  • continuous - Get a stream that continuously emits the latest state at the time
  • discrete - Get a stream of the latest state values, but only when the new state value is distinct from the previous value

You should be aware that you are not guaranteed every single state change using these methods. It is equivallent to repeatedly calling get and maybe doing some stateful filtering after. If you actually want every single resulting state, you can use a single input stream with the hookup pipe. Or, you can use EventStateTopic and subscribe.

EventStateTopic

EventStateTopic is a variant of EventState that allows you to subscribe to all new state changes, just like an FS2 Topic. It has a new method subscribe that returns a stream of all state changes from the moment of subscription, beginning with its current value. You can also hookupAndSubscribe which will concurrently apply all events from the current stream, as well as subscribe and return all events, including ones not from the input stream. This can be very useful if you have multiple spots where you want to “listen for” new state changes and get all of them, instead of just the latest one. The downside of doing this approach is it is less efficient, as every single state change must be processed instead of just the latest one available, but sometimes that is the exact semantic that you want.

If you are debugging, look into ReplayableEventState from the testing package, which is based on EventStateTopic.