Motivation - Sending events between Elixir processes with back-pressure in a producer -> producer_consumer(transform) -> consumer
Producer - recieve demand(from consumer) and generate events
Producer_Consumer - Doesn't handle demand, but transform events data
Consumer - Recieve final events
Demand is auto generated and you just need to start_link => sync_subscribe multiple of the same type to parallelize
Producer
aliasExperimental.GenStagedefmoduleAdouseGenStagedefinit(counter) do {:producer, counter}enddefhandle_demand(demand, counter) when demand >0do# If the counter is 3 and we ask for 2 items, we will# emit the items 3 and 4, and set the state to 5. events =Enum.to_list(counter..counter+demand-1)# The events to emit is the second element of the tuple,# the third being the state. {:noreply, events, counter + demand}endend
Producer_Consumer
aliasExperimental.GenStagedefmoduleBdouseGenStagedefinit(number) do {:producer_consumer, number}enddefhandle_events(events,_from, number) do events =Enum.map(events, & &1 * number) {:noreply, events, number}endend
Consumer
aliasExperimental.GenStagedefmoduleCdouseGenStagedefinit(sleeping_time) do {:consumer, sleeping_time}enddefhandle_events(events,_from, sleeping_time) do# Print events to terminal.IO.inspect(events)# Sleep the configured time.Process.sleep(sleeping_time)# We are a consumer, so we never emit events. {:noreply, [], sleeping_time}endend
Connect
{:ok, a} =GenStage.start_link(A,0) # starting from zero{:ok, b} =GenStage.start_link(B,2) # multiply by 2{:ok, c} =GenStage.start_link(C,1000) # sleep for a secondGenStage.sync_subscribe(c, to: b)GenStage.sync_subscribe(b, to: a)# Sleep so we see events printed.Process.sleep(:infinity)
Parallelize
{:ok, a} =GenStage.start_link(A,0) # starting from zero{:ok, b} =GenStage.start_link(B,2) # multiply by 2{:ok, c1} =GenStage.start_link(C,1000) # sleep for a second{:ok, c2} =GenStage.start_link(C,1000) # sleep for a second{:ok, c3} =GenStage.start_link(C,1000) # sleep for a second{:ok, c4} =GenStage.start_link(C,1000) # sleep for a secondGenStage.sync_subscribe(c1, to: b)GenStage.sync_subscribe(c2, to: b)GenStage.sync_subscribe(c3, to: b)GenStage.sync_subscribe(c4, to: b)GenStage.sync_subscribe(b, to: a)# Sleep so we see events printed.Process.sleep(:infinity)