Understanding GenStage in Elixir

GenServers are an amazing part of the OTP ecosystem, helping solve a lot of complicated tasks easily in Elixir. But working with multiple sets of GenServers together and exchanging data between them has always been a hassle. GenEvent solved these problems to some extent, but it did not provide support for backpressure or concurrency. This is where GenStage comes in.

Last year announced on the official Elixir blog, GenStage is a new behaviour in the OTP family, built on top of GenServers. It is described as:

GenStage is a specification for exchanging events with back-pressure between Elixir producers and consumers

but that doesn’t convey its full power. The name hides the real meaning; instead of simple producers and consumers, imagine concurrent stages in a pipeline that takes an arbitrary number of input events, perform some operations on them and send them to the next stage. Each stage can consume and produce many events at once, with multiple instances of each stage running at the same time. The producers can produce as many events as they want, and the consumers can consume them according to their demand with ease, with the behavior handling the bulk of the logic.

A simple use-case of GenStage would look like a straight-forward pipeline like this:

GenStage Simple Use-case

But that doesn’t do it justice, instead you should think about each stage having many concurrent instances like this:

GenStage Concurrent Instances

This is possible due to another behavior shipped with the library, called ConsumerSupervisor. ConsumerSupervisors allow, well, supervision of multiple consumers or producer-consumers for better reliability and fault-tolerance. For code examples, I would suggest you check out the project on Github, the excellent documentation on Hexdocs or the Flow library (which is an amazing implementation of GenStage!). But there are some things you should take care of when working with GenStages in Elixir (that I had to learn the hard way):

  • Don’t excessively produce events when there is no demand
  • If a long-running producer yields zero events when there is consumer demand, you have to explicitly retry producing events again. Otherwise, it would just stop. A common case where this would happen is, integration with external message queues
  • Use Supervisor for simple pipelines, and ConsumerSupervisor when dealing with concurrent instances of a GenStage
  • Try to define explicit demands for consumers
  • Start the stages in order, and immediately subscribe to the previous stage for consumers and producer_consumers, preferably in the init/1 callback
  • Maintining overall demand in a producers’ internal state is a good idea

Here are some other excellent posts on the topic: