Tips for working with FS2

The streaming library fs2 has had major improvements in their latest release (v0.10), and libraries like Http4s v0.18 have adopted this newest version. As you work more with fs2 and Http4s there are some things you should be aware of, as they will make the journey easier. Specifically, we will look at how to work with flatMap in Streams and at Topics, along some minor comments on fs2 Streams. Those are some hints which I, as someone not very familiar with fs2 at the time, wish I had known sooner.

Without further ado, let’s talk about some ‘gotchas’ we may find when working with fs2.

Everything is a Stream

When you come from a non-FP world, something slightly surprising when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue:

val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

The snippet above is the official example on how to create a Queue. Granted, we can do the following:

val queue: F[Queue[F, String]] = async.circularBuffer[F, String](5)

but given that working with Stream is easier than working with F[_], we probably should stick to the first example.

The fact that a Queue is a stream means the interaction with the Queue itself happens inside a flatMap call, as in this example:

import cats.effect.{ Effect, IO }
import fs2._
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object SampleCode extends App {
  val queue: Stream[IO, Queue[IO, String]] = Stream.eval(async.circularBuffer[IO, String](5))

  val element: Stream[IO, String] = 
     for {
        q <- queue
        data <- q.dequeue
     } yield data
}     

In the snippet above we use flatMap (as a for-comprehension) to obtain the elements stored in the queue. Get comfortable with flatMap, as with fs2 we will be using it a lot.

Beware infinite streams

Streams are useful for many reasons, but one of the common examples is processing an infinite stream: we don’t have enough memory to store infinite data in a List but with a Stream we can process an infinite stream correctly. For example:

import cats.effect._
import fs2._

object SampleCode extends App {

  val infiniteStream = Stream.emit(1).repeat.covary[IO].map(_ + 3)

  val output = infiniteStream.compile.toVector.unsafeRunSync()
  println(s"Result >> $output") // prints Result >> Vector(4, 4, 4, 4, ... )

}

The snippet above creates an infinite stream of data, emitting the value 1. Every value we emit is then mapped to convert it to 4. The result is a Vector full of 4. Except I lied and the result of the code above is a non-terminating program.

The reason this happens is the way streams, as a functional programming structure, behave. Each operation in the stream happens sequentially, and previous steps must be completed before the next ones are run. For example if you look at the implementation of toVector, it uses a fold behind the scenes. But a fold of an infinite stream will never complete, as we will always have more data to append.

A way to solve this issue is to limit the data we emit, like:

import cats.effect._
import fs2._

object SampleCode extends App {

  val infiniteStream = Stream.emit(1).repeat.covary[IO].map(_ + 3).take(4)

  val output = infiniteStream.compile.toVector.unsafeRunSync()
  println(s"Result >> $output") // prints Result >> Vector(4, 4, 4, 4)

}

This program terminates, and emits exactly 4 values.

This may not always be an issue, depending on how we interact with our streams. For example, an Http4s WebSocket will be consuming all the data and sending it to the client, as long as the connection is alive. But there’s the chance we must run an infinite stream, in which case we must consider how to manage it.

Flatmap all the things… except some Streams

Use of queues is one of the recommended ways to integrate data from the external data sources onto your streams. In particular the documentation example shows the following:

import fs2._
import fs2.async
import scala.concurrent.ExecutionContext
import cats.effect.{ Effect, IO }

type Row = List[String]

trait CSVHandle {
  def withRows(cb: Either[Throwable,Row] => Unit): Unit
}

def rows[F[_]](h: CSVHandle)(implicit F: Effect[F], ec: ExecutionContext): Stream[F,Row] =
  for {
    q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]])
    _ <-  Stream.eval { F.delay(h.withRows(e => async.unsafeRunAsync(q.enqueue1(e))(_ => IO.unit))) }
    row <- q.dequeue.rethrow
  } yield row

where we do a callback to withRows to enqueue data, and we receive a stream from dequeuing this data. In our codebase we probably want to split the rows function into several smaller functions, as we may want to process the data in the queue before dequeuing. Let’s do this code split as an exercise.

The code is supposed to work with external streams of data. These may be the input from a WebSocket, or a stream reading from an external source like a Kafka topic. In our example, we can fake the 3rd party data source using a Scheduler that emits a String every second:

val streamData: Stream[IO, String] = Scheduler[IO](corePoolSize = 1).flatMap { scheduler =>
  scheduler.awakeEvery[IO](1.second).map(_ => (System.currentTimeMillis() % 10000).toString)
}

Our code will be using a queue to store the data we read from the data source until we process it. This means we need a method that will enqueue data to the queue:

def enqueueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]): Stream[F, Future[Unit]] =
  Stream.eval(
    F.delay(
      streamData
        .map(s => {
          async.unsafeRunAsync(q.enqueue1(s))(_ => IO.unit)
        })
        .compile
        .drain
        .unsafeToFuture()
    )
  )

Let’s understand what is going on in this method. Our function receives a queue q as a parameter, which is the queue we will use to store the data as we receive it. When our fake external source of data, streamData, has a new value available we enqueue the value with q.enqueue1(s). But q.enqueue1(s), as a good FP citizen, will just return a F[Unit] without triggering any side effect. We have to explicitly run it with async.unsafeRunAsync to enqueue the element. You can see the same pattern in the documentation.

At this point we have declared our Stream and the logic to enqueue elements. But the Stream will do nothing unless we run it! This is where the calls to compile.drain.unsafeToFuture() become relevant. Without them, our fake stream will not be executed and no element will be added to the queue. The rest of the code is just wrappers to return an Stream[F, Future[Unit]] as result of executing this function.

Now that we know how to enqueue, let’s provide a mechanism for retrieving data from the queue:

def dequeueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) = q.dequeue.take(4)

A much simpler method. Given a queue, we call dequeue to obtain a Stream[F, String]. The take call here is very important, as we are working with an infinite stream of data. Remember the previous section.

Having a way to enqueue and dequeue, our last requirement is a function that creates a queue and proceeds to call these two functions, so we can get data from our fake external source:

def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
  val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

  val enqueueStream = queue.flatMap { q =>
    enqueueData(q)
  }

  val dequeueStream = queue.flatMap { q ⇒
    dequeueData(q)
  }

  dequeueStream.concurrently(enqueueStream)
}

In this snippet we create a queue, which is backed by a circular buffer (to limit memory consumption), and we use that queue with our previously defined functions enqueueData and dequeueData.

The full sample, all together and with the relevant imports and the command to execute it, looks like:

import cats.effect.{ Effect, IO }
import fs2._
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object SampleCode extends App {

  val streamData: Stream[IO, String] = Scheduler[IO](corePoolSize = 1).flatMap { scheduler =>
    scheduler.awakeEvery[IO](1.second).map(_ => (System.currentTimeMillis() % 10000).toString)
  }

  def enqueueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) =
    Stream.eval(
      F.delay(
        streamData
          .map(s => {
            async.unsafeRunAsync(q.enqueue1(s))(_ => IO.unit)
          })
          .compile
          .drain
          .unsafeToFuture()
      )
    )

  def dequeueData[F[_]](q: Queue[F, String])(implicit F: Effect[F]) = q.dequeue.take(4)

  def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

    val enqueueStream = queue.flatMap { q =>
      enqueueData(q)
    }

    val dequeueStream = queue.flatMap { q ⇒
      dequeueData(q)
    }

    dequeueStream.concurrently(enqueueStream)
  }

  val resultQueue = withQueue[IO].compile.toVector.unsafeRunSync()
  println(s"Queue >> $resultQueue") // hangs

}

This seems like a reasonable piece of code to manage external sources. Also, this doesn’t work. Running the example will hang the process.

The reason is that queue, which is a Stream of Queue[F, String], creates new streams on each flatMap. This means enqueueStream and dequeueStream are using different queue inside the streams, and they don’t have visibility on each other’s data. I found it specially easy to miss this when working with queues and the provided Streams

To get the code working, replace withQueue with:

  def withQueue[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val queue: Stream[F, Queue[F, String]] = Stream.eval(async.circularBuffer[F, String](5))

    queue.flatMap { q =>
      val enqueueStream = enqueueData(q)

      val dequeueStream = dequeueData(q)

      dequeueStream.concurrently(enqueueStream)
    }
  }

In this new snippet we call the support functions inside flatMap and return as result of the operation the concatenation of both streams. Please note that the last step is to run the resulting streams in parallel via concurrently. Without this last step the process won’t work.

The reason is that at the end of the process we will call compile.drain.unsafeRunSync to execute our stream. Without concurrently, one of the streams won’t be part of the stream we are executing, and as a consequence the operations associated to it won’t happen: you won’t enqueue or dequeue data. You can try this by just returning one of the streams.

With this new snippet, both operations share the same queue and the result is printed as expected.

Topics

Topics provide fs2’s implementation of the publish-subscribe pattern. They are extremely useful but, alas, barely documented. They have a twist that makes them interesting to work with compared to other pub sub systems: they are completely side-effect free.

Creating a topic is easy enough, but if we’re not careful we can get into the same problems as we did with Streams:

import cats.effect.{ Effect, IO }
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global

object SampleCode extends App {

  def withTopic[F[_]](implicit F: Effect[F]): Stream[F, String] = {
    val topicStream = Stream.eval(fs2.async.topic[F, String]("Topic start"))

    topicStream.flatMap { topic =>
      val publisher = Stream.emit("1").repeat.covary[F].to(topic.publish)

      val subscriber = topic.subscribe(10).take(4)

      subscriber.concurrently(publisher)
    }

  }

  val resultTopic = withTopic[IO].compile.toVector.unsafeRunSync()
  println(s"Topic >> $resultTopic") // prints Topic >> Vector(Topic start, 1, 1, 1)

}

Each call to flatMap in this example creates a new topic with its own subscribers and publishers. This results in a familiar problem: data isn’t shared across topics.

The solution is simple: store your Topic in a structure we can share across our clients, for example a Map:

val topicMap: mutable.Map[String, Topic[F, String]] = mutable.Map.empty

but there are some nuances to it. Let’s see some working code first:

import fs2._
import cats.effect._
import fs2.async.mutable.Topic

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global

object SampleCode extends App {

  val topicMap: mutable.Map[String, Topic[IO, String]] = mutable.Map.empty

  //we create and store topic before we call publishers and subscribers
  val topicStream = Stream
    .eval(fs2.async.topic[IO, String]("Topic start"))
    .map(topic  { topicMap += ("k" -> topic); topic })
    .compile
    .drain
    .unsafeRunSync()

  def addPublisher(id: String): Stream[IO, Unit] = {
    val topic = topicMap("k")
    Stream.emit(id).covary[IO].repeat.to(topic.publish)
  }

  def addSubscriber: Stream[IO, String] = {
    val topic = topicMap("k")

    topic
      .subscribe(10)
      .take(4)
  }

  val publisher     = addPublisher("1")
  val subscriber    = addSubscriber
  val joinedStreams = subscriber.concurrently(publisher)

  val output = joinedStreams.compile.toVector.unsafeRunSync()
  println(s"Topic >> $output") // prints Topic >> Vector(1, 1, 1, 1)

}

If we read the implementation of topicStream we will notice that we store the topic in the map as soon as we create it, in a map operation with a side effect (sorry). The reason we do this here and not when calling addPublisher is concurrency. If we create the topic inside the addPublisher method, it may not be in the map when addSubscriber tries to read from it. Of course is not always possible to create the topics in advance, but this means your subscribers must be ready to wait or retry until a topic is available.

Another detail on topicStream which can be easy to miss is the fact that we compile and run the stream. Without this step, the topic won’t be created! Just calling fs2.async.topic[IO, String]("Topic start") is not enough, and as we want the topic stored in the map before we add publishers and subscribers, we must run it now.

The last relevant piece of code is joinedStreams. Both addPublisher and addSubscriber create streams using the stored topic. As with the streams in our queue example, we must run these streams concurrently, otherwise data won’t flow correctly. We can verify this by removing joinedStreams and replacing publisher and subscriber with:

  val publisher  = addPublisher("1").compile.drain.runAsync(_  IO.unit)
  val subscriber = addSubscriber.compile.toVector.runAsync(e  IO(println(s"Topic >> $e")))

Despite running asynchronously and having an infinite stream as publisher, the process will hang in the subscriber, waiting for data.

Conclusions

We have reviewed some possible pitfalls we may encounter when we start working with fs2. We have provided sample code to demonstrate the problematic code snippets as well as the solution for these issues. Hopefully this will help with the transition to http4s v0.18, specially when working with WebSockets!

Acknowledgements

Thanks to Danielle Ashley, Dave Gurnell and Richard Dallaway for their comments on the draft. Thanks to the fs2 Gitter channel and specially to Fabio Labella for their help understanding several fs2 concepts. And thanks to Underscore for allowing me to publish this here :)


Like what you're reading?

Join our newsletter

Looking for a Scala job?

Job Listings


Comments

Please review our Community Guidelines before posting a comment. We encourage discussion in good faith, but do not allow combative, exclusionary, or harassing behaviour. If you have any questions, contact us!