Genstage

Motivation - Sending events between Elixir processes with back-pressure in a producer -> producer_consumer(transform) -> consumer

  • Producer - recieve demand(from consumer) and generate events

  • Producer_Consumer - Doesn't handle demand, but transform events data

  • Consumer - Recieve final events

Demand is auto generated and you just need to start_link => sync_subscribe multiple of the same type to parallelize

Producer

alias Experimental.GenStage

defmodule A do
  use GenStage

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)

    # The events to emit is the second element of the tuple,
    # the third being the state.
    {:noreply, events, counter + demand}
  end
end

Producer_Consumer

alias Experimental.GenStage

defmodule B do
  use GenStage

  def init(number) do
    {:producer_consumer, number}
  end

  def handle_events(events, _from, number) do
    events = Enum.map(events, & &1 * number)
    {:noreply, events, number}
  end
end

Consumer

alias Experimental.GenStage

defmodule C do
  use GenStage

  def init(sleeping_time) do
    {:consumer, sleeping_time}
  end

  def handle_events(events, _from, sleeping_time) do
    # Print events to terminal.
    IO.inspect(events)

    # Sleep the configured time.
    Process.sleep(sleeping_time)

    # We are a consumer, so we never emit events.
    {:noreply, [], sleeping_time}
  end
end

Connect

{:ok, a} = GenStage.start_link(A, 0)    # starting from zero
{:ok, b} = GenStage.start_link(B, 2)    # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

Parallelize

{:ok, a} = GenStage.start_link(A, 0)     # starting from zero
{:ok, b} = GenStage.start_link(B, 2)     # multiply by 2

{:ok, c1} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c2} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c3} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c4} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c1, to: b)
GenStage.sync_subscribe(c2, to: b)
GenStage.sync_subscribe(c3, to: b)
GenStage.sync_subscribe(c4, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

Last updated