Robust job processing in Elixir, backed by modern PostgreSQLFeatures
Scheduled Jobs - at any time in future down to second
Queue Control - start, stop, scale independently
See the Oban.Worker
docs for more details on failure conditions and Oban.Telemetry
for details on job reporting.
Usage
Defining Workers
Must define perform/1
function with %Oban.Job{} struct
args will always have string keys cuz serialization
Successful jobs return :ok
or {:ok, value}
to treat as success, return other for failure, discard, deferred
Copy defmodule MyApp . Business do
use Oban . Worker ,
queue: :events ,
priority: 3 ,
max_attempts: 3 ,
tags: [ "business" ] ,
unique: [period: 30 ] #in seconds
@impl Oban . Worker
def perform (% Oban . Job {args: %{ "id" => id} = args}) do
#work hard grrrrr
:ok
end
end
Enqueueing Jobs
Jobs are enqueued by inserting Ecto struct into db. All workers define a new/2
function that serves as changest.
Copy %{id: 1 , in_the: "business" , of_doing: "business" }
|> MyApp . Business . new ()
|> Oban . insert ()
At specific future time
Copy %{id: 1 }
|> MyApp . Business . new (scheduled_at: ~U[2020-12-25 19:00:56.0Z] )
|> Oban . insert ()
Periodic Jobs
Has one minute resolution(will trigger within a minute)
Copy config :my_app , Oban , repo: MyApp . Repo , crontab: [
{ "* * * * *" , MyApp . MinuteWorker } ,
{ "0 * * * *" , MyApp . HourlyWorker , args: %{custom: "arg" }} ,
{ "0 0 * * *" , MyApp . DailyWorker , max_attempts: 1 } ,
{ "0 12 * * MON" , MyApp . MondayWorker , queue: :scheduled , tags: [ "mondays" ]} ,
{ "@daily" , MyApp . AnotherDailyWorker }
]
Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Unique Jobs
Uniqueness is based on a combination of args
, queue
, worker
, state
and insertion time.
Can be configured for worker
Copy use Oban . Worker , unique: [period: 60 ] #in seconds
Or for specific job
Copy %{email: "brewster@example.com" }
|> MyApp . Mailer . new (unique: [period: 300 , fields: [:queue , :worker])
|> Oban . insert ()
Setup
1) Install oban dep
2) Migration
Copy mix ecto.gen.migration add_oban_jobs_table
Place the follow in the newly created migration
Copy use Ecto . Migration
def up do
Oban . Migrations . up ()
end
# We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
# necessary, regardless of which version we've migrated `up` to.
def down do
Oban . Migrations . down (version: 1 )
end
3) Config
Config.exs
Copy config :my_app , Oban ,
repo: MyApp . Repo ,
plugins: [ Oban . Plugins . Pruner ] ,
queues: [default: 10 , events: 50 , media: 20 ]
application.ex
Copy def start ( _type , _args ) do
children = [
Repo ,
Endpoint ,
{ Oban , oban_config ()}
]
Supervisor . start_link (children , strategy: :one_for_one , name: MyApp . Supervisor )
end
# Conditionally disable crontab, queues, or plugins here.
defp oban_config do
Application . get_env (:my_app , Oban )
end