Tips for working with FS2

The streaming library fs2 has had major improvements in their latest release (v0.10), and libraries like Http4s v0.18 have adopted this newest version. As you work more with fs2 and Http4s there are some things you should be aware of, as they will make the journey easier. Specifically, we will look at how to work with flatMap in Streams and at Topics, along some minor comments on fs2 Streams. Those are some hints which I, as someone not very familiar with fs2 at the time, wish I had known sooner.

EDIT: This post has been updated thanks to feedback from Fabio Labella (@SystemFw in Gitter) on the Topics section.

Without further ado, let’s talk about some ‘gotchas’ we may find when working with fs2.

Everything is a Stream

When you come from a non-FP world, something slightly surprising when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue:

val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

The snippet above is the official example on how to create a Queue. Granted, we can do the following:

val queue: F[Queue[F, String]] = async.circularBuffer[F, String](5)

but given that working with Stream is easier than working with F[_], we probably should stick to the first example.

The fact that a Queue is a stream means the interaction with the Queue itself happens inside a flatMap call, as in this example:

import cats.effect.{ Effect, IO }
import fs2._
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object SampleCode extends App {
  val queue: Stream[IO, Queue[IO, String]] = Stream.eval(async.circularBuffer[IO, String](5))

  val element: Stream[IO, String] = 
     for {
        q <- queue
        data <- q.dequeue
     } yield data
}     

In the snippet above we use flatMap (as a for-comprehension) to obtain the elements stored in the queue. Get comfortable with flatMap, as with fs2 we will be using it a lot.

Beware infinite streams

Streams are useful for many reasons, but one of the common examples is processing an infinite stream: we don’t have enough memory to store infinite data in a List but with a Stream we can process an infinite stream correctly. For example:

import cats.effect._
import fs2._

object SampleCode extends App {

  val infiniteStream = Stream.emit(1).repeat.covary[IO].map(_ + 3)

  val output = infiniteStream.compile.toVector.unsafeRunSync()
  println(s"Result >> $output") // prints Result >> Vector(4, 4, 4, 4, ... )

}

The snippet above creates an infinite stream of data, emitting the value 1. Every value we emit is then mapped to convert it to 4. The result is a Vector full of 4. Except I lied and the result of the code above is a non-terminating program.

The reason this happens is the way streams, as a functional programming structure, behave. Each operation in the stream happens sequentially, and previous steps must be completed before the next ones are run. For example if you look at the implementation of toVector, it uses a fold behind the scenes. But a fold of an infinite stream will never complete, as we will always have more data to append.

A way to solve this issue is to limit the data we emit, like:

import cats.effect._
import fs2._

object SampleCode extends App {

  val infiniteStream = Stream.emit(1).repeat.covary[IO].map(_ + 3).take(4)

  val output = infiniteStream.compile.toVector.unsafeRunSync()
  println(s"Result >> $output") // prints Result >> Vector(4, 4, 4, 4)

}

This program terminates, and emits exactly 4 values.

This may not always be an issue, depending on how we interact with our streams. For example, an Http4s WebSocket will be consuming all the data and sending it to the client, as long as the connection is alive. But there’s the chance we must run an infinite stream, in which case we must consider how to manage it.

Flatmap all the things… except some Streams

Use of queues is one of the recommended ways to integrate data from the external data sources onto your streams. In particular the documentation example shows the following:

import fs2._
import fs2.async
import scala.concurrent.ExecutionContext
import cats.effect.{ Effect, IO }

type Row = List[String]

trait CSVHandle {
  def withRows(cb: Either[Throwable,Row] => Unit): Unit
}

def rows[F[_]](h: CSVHandle)(implicit F: Effect[F], ec: ExecutionContext): Stream[F,Row] =
  for {
    q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]])
    _ <-  Stream.eval { F.delay(h.withRows(e => async.unsafeRunAsync(q.enqueue1(e))(_ => IO.unit))) }
    row <- q.dequeue.rethrow
  } yield row

where we do a callback to withRows to enqueue data, and we receive a stream from dequeuing this data. In our codebase we probably want to split the rows function into several smaller functions, as we may want to process the data in the queue before dequeuing. Let’s do this code split as an exercise.

The code is supposed to work with external streams of data. These may be the input from a WebSocket, or a stream reading from an external source like a Kafka topic. In our example, we can fake the 3rd party data source using a Scheduler that emits a String every second:

val streamData: Stream[IO, String] = Scheduler[IO](corePoolSize = 1).flatMap { scheduler =>
  scheduler.awakeEvery[IO](1.second).map(_ => (System.currentTimeMillis() % 10000).toString)
}

Our code will be using a queue to store the data we read from the data source until we process it. This means we need a method that will enqueue data to the queue:

def enqueueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]): Stream[F, Future[Unit]] =
  Stream.eval(
    F.delay(
      streamData
        .map(s => {
          async.unsafeRunAsync(q.enqueue1(s))(_ => IO.unit)
        })
        .compile
        .drain
        .unsafeToFuture()
    )
  )

Let’s understand what is going on in this method. Our function receives a queue q as a parameter, which is the queue we will use to store the data as we receive it. When our fake external source of data, streamData, has a new value available we enqueue the value with q.enqueue1(s). But q.enqueue1(s), as a good FP citizen, will just return a F[Unit] without triggering any side effect. We have to explicitly run it with async.unsafeRunAsync to enqueue the element. You can see the same pattern in the documentation.

At this point we have declared our Stream and the logic to enqueue elements. But the Stream will do nothing unless we run it! This is where the calls to compile.drain.unsafeToFuture() become relevant. Without them, our fake stream will not be executed and no element will be added to the queue. The rest of the code is just wrappers to return an Stream[F, Future[Unit]] as result of executing this function.

Now that we know how to enqueue, let’s provide a mechanism for retrieving data from the queue:

def dequeueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) = q.dequeue.take(4)

A much simpler method. Given a queue, we call dequeue to obtain a Stream[F, String]. The take call here is very important, as we are working with an infinite stream of data. Remember the previous section.

Having a way to enqueue and dequeue, our last requirement is a function that creates a queue and proceeds to call these two functions, so we can get data from our fake external source:

def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
  val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

  val enqueueStream = queue.flatMap { q =>
    enqueueData(q)
  }

  val dequeueStream = queue.flatMap { q ⇒
    dequeueData(q)
  }

  dequeueStream.concurrently(enqueueStream)
}

In this snippet we create a queue, which is backed by a circular buffer (to limit memory consumption), and we use that queue with our previously defined functions enqueueData and dequeueData.

The full sample, all together and with the relevant imports and the command to execute it, looks like:

import cats.effect.{ Effect, IO }
import fs2._
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object SampleCode extends App {

  val streamData: Stream[IO, String] = Scheduler[IO](corePoolSize = 1).flatMap { scheduler =>
    scheduler.awakeEvery[IO](1.second).map(_ => (System.currentTimeMillis() % 10000).toString)
  }

  def enqueueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) =
    Stream.eval(
      F.delay(
        streamData
          .map(s => {
            async.unsafeRunAsync(q.enqueue1(s))(_ => IO.unit)
          })
          .compile
          .drain
          .unsafeToFuture()
      )
    )

  def dequeueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) = q.dequeue.take(4)

  def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

    val enqueueStream = queue.flatMap { q =>
      enqueueData(q)
    }

    val dequeueStream = queue.flatMap { q ⇒
      dequeueData(q)
    }

    dequeueStream.concurrently(enqueueStream)
  }

  val resultQueue = withQueue[IO].compile.toVector.unsafeRunSync()
  println(s"Queue >> $resultQueue") // hangs

}

This seems like a reasonable piece of code to manage external sources. Also, this doesn’t work. Running the example will hang the process.

The reason is that queue, which is a Stream of Queue[F, String], creates new streams on each flatMap. This means enqueueStream and dequeueStream are using different queue inside the streams, and they don’t have visibility on each other’s data. I found it specially easy to miss this when working with queues and the provided Streams

To get the code working, replace withQueue with:

  def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

    queue.flatMap { q =>
      val enqueueStream = enqueueData(q)

      val dequeueStream = dequeueData(q)

      dequeueStream.concurrently(enqueueStream)
    }
  }

In this new snippet we call the support functions inside flatMap and return as result of the operation the concatenation of both streams. Please note that the last step is to run the resulting streams in parallel via concurrently. Without this last step the process won’t work.

The reason is that at the end of the process we will call compile.drain.unsafeRunSync to execute our stream. Without concurrently, one of the streams won’t be part of the stream we are executing, and as a consequence the operations associated to it won’t happen: you won’t enqueue or dequeue data. You can try this by just returning one of the streams.

With this new snippet, both operations share the same queue and the result is printed as expected.

Topics

Topics provide fs2’s implementation of the publish-subscribe pattern. They are extremely useful but, alas, barely documented. They have a twist that makes them interesting to work with compared to other pub sub systems: they are completely side-effect free.

Creating a topic is easy enough:

import cats.effect.{ Effect, IO }
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global

object SampleCode extends App {

  def withTopic[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val topicStream = Stream.eval(fs2.async.topic[F, String]("Topic start"))

    topicStream.flatMap { topic =>
      val publisher = Stream.emit("1").repeat.covary[F].to(topic.publish)

      val subscriber = topic.subscribe(10).take(4)

      subscriber.concurrently(publisher)
    }

  }

  val resultTopic = withTopic[IO].compile.toVector.unsafeRunSync()
  println(s"Topic >> $resultTopic") // prints Topic >> Vector(Topic start, 1, 1, 1)

}

Each call to flatMap in this example creates a new topic with its own subscribers and publishers. This works for this example, but we may find a familiar problem if we wanted to reuse that topic to add multiple publishers and subscribers (for example inside an http4s service). Each call to flatMap would create a new topic, which results in a familiar problem: data isn’t shared across topics, different requests don’t see the same data.

In a previous version of the post we had a solution that was using a map as an intermediate step to store the topic to share between requests. But thanks to Fabio Labella we have a much nicer solution at hand:

import fs2._
import fs2.async.mutable.Topic
import fs2.StreamApp.ExitCode
import cats.implicits._
import cats.effect._

import scala.collection.mutable
import scala.concurrent.ExecutionContext

object SampleCode2 extends StreamApp[IO] {

  def sharedTopicStream[F[_]: Effect](topicId: String)(implicit ec: ExecutionContext): Stream[F, Topic[F, String]] =
    Stream.eval(async.topic[F, String](s"Topic $topicId start"))

  def addPublisher[F[_]](topic: Topic[F, String], value: String): Stream[F, Unit] =
    Stream.emit(value).covary[F].repeat.to(topic.publish)

  def addSubscriber[F[_]](topic: Topic[F, String]): Stream[F, String] =
    topic
      .subscribe(10)
      .take(4)

  // a request that adds a publisher to the topic
  def requestAddPublisher[F[_]](value: String, topic: Topic[F, String]): Stream[F, Unit] =
    addPublisher(topic, value)

  // a request that adds a subscriber to the topic
  def requestAddSubscriber[F[_]](topic: Topic[F, String])(implicit F: Effect[F]): Stream[F, Unit] =
    addSubscriber(topic).to(Sink.showLinesStdOut)

  def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = {
    import ExecutionContext.Implicits.global
    // we simulate  requests that work on a common topic.
    val sharedTopic = sharedTopicStream[IO]("sharedTopic")

    // sharedTopic is passed to your Services, which use it as necessary
    sharedTopic.flatMap { topic 
      requestAddPublisher("publisher1", topic) concurrently
      requestAddPublisher("publisher2", topic) concurrently
      requestAddSubscriber(topic) concurrently
      requestAddSubscriber(topic)

    }.drain ++ Stream.emit(ExitCode.Success)
  }

}

Please note in the code above the JVM won’t exit after running the example, as we are using a StreamApp[IO] trait. This is expected behaviour!

The key of the solution is the creation of sharedTopic, which is then used to provide a common topic to multiple request, which we simulate (to simplify the code in the example) via some function call. This approach removes the need of ugly unsafeRunSync calls inside the code, and it gives rise to an amazing property: the region you share any given piece of pure mutable state is the same as your call graph, and that makes shared state easy to reason about, unlike its side-effecting counterpart

The mistake I made in the previous code sample, and which I’ve been assured is unfortunately quite common, was replicating a pattern from the object-oriented world: storing the topic in a transient structure (a mutable map) as to share it across requests. But, in a functional world, we provide to each function everything we need to maintain referential transparency.

In short: if you need to share data structures, like topics, resist the temptation of using mutable structures to store them, and pass the structure you need as a parameter. At the top of your graph call you will be able to instantiate the necessary instance and share it across

Conclusions

We have reviewed some possible pitfalls we may encounter when we start working with fs2. We have provided sample code to demonstrate the problematic code snippets as well as the solution for these issues. Hopefully this will help with the transition to http4s v0.18, specially when working with WebSockets!

Acknowledgements

Thanks to Danielle Ashley, Dave Gurnell and Richard Dallaway for their comments on the draft. Thanks to the fs2 Gitter channel and specially to Fabio Labella for their help understanding several fs2 concepts. And thanks to Underscore for allowing me to publish this here :)


Like what you're reading?

Join our newsletter


Comments

We encourage discussion of our blog posts on our Gitter channel. Please review our Community Guidelines before posting there. We encourage discussion in good faith, but do not allow combative, exclusionary, or harassing behaviour. If you have any questions, contact us!