Genstage

Motivation - Sending events between Elixir processes with back-pressure in a producer -> producer_consumer(transform) -> consumer

  • Producer - recieve demand(from consumer) and generate events

  • Producer_Consumer - Doesn't handle demand, but transform events data

  • Consumer - Recieve final events

Demand is auto generated and you just need to start_link => sync_subscribe multiple of the same type to parallelize

Producer

alias Experimental.GenStage

defmodule A do
  use GenStage

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)

    # The events to emit is the second element of the tuple,
    # the third being the state.
    {:noreply, events, counter + demand}
  end
end

Producer_Consumer

Consumer

Connect

Parallelize

Last updated