How to create beautiful pipelines on Elixir with Opus edit

I am not sure if Zorbash had the idea to create Opus while we worked at the same project at Quiqup, but one thing I can be sure, his idea saved us avoiding hard to read modules full of business logic orchestration. "What do you mean?, Elixir has an already beautiful syntax", you may say. And I agree, but sometimes we can just push it a bit more.

Use case

In Quiqup we have a business model that requires our drivers to perform different kinds of work. For example, depending on who is ordering, our drivers might need to just collect an pre-ordered food order from a restaurant or walk into a store and pick many items. Also, sometimes they need to pay at the venue on behalf of the customer and if so, we need to let them know. Those and, believe me, lots of more decisions in just one step of each delivery.

Facing this situation, we decided to create pipeline modules where we could read them and know, without going deep in the implementation, how the working flow for such decisions works. Like a poem. We wanted to write poem modules.

This is where the with statement shines. We came up with something like:

defmodule Quiqup.DispatchOrderStagePipeline do
  def call(%{order_id: order_id}) do
    with {:ok, %{courier: courier} = order} <- validate_order(order),
         {:ok, true} <- courier_available?(courier),
         {:ok, next_stage_slug} <- next_stage_slug_by(order),
         {:persist_stage, {:ok, %Stage{} = stage}}
             <- {:persist_stage, %{order: order} |> create_stage(next_stage_slug)},
         {:push_notification, :ok}
             <- {:push_notification, push_notification(%{courier: courier, stage: stage})},
         {:emit_event, :ok}
             <- {:emit_event, emit_event(%{courier: courier, stage: stage})} do
        {:ok, stage}
    else
        e -> {:error, e}
    end
  end
end

Ermm... not really pretty. Although it is possible to read it after spending some seconds, we can imagine how awful it would be to maintain and make it bigger in a near future as soon as our business requirements get more complex. Not to mention that functions like create_next_stage? and create_stage may have a lot of decisions by themselves.

Opus for the rescue

As mentioned in Opus's readme, the inspiration came from dry.rb (transaction) and trailblazer (operation) to use a railway oriented programming approach.

In summary, the equivalent of that code example mentioned above using Opus would be something like:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline

  check :validate_order
  step :assign_next_stage_slug
  step :assign_courier
  check :courier_available?
  step :persist_stage
  link PushNotificationPipeline
  tee :emit_event
end

Before digging into each line of the code above, let's understand the basics.

How Opus works

Note: this article was written for the version 0.5.1

Opus's core aspect is Opus.Pipeline and you can use into a module so that it works as a pipeline. By pipeline, it means that the initial input we need to provide will pass through different stages inside this pipeline, and each stage will provide an output to be the input of the next stage.

Take a look at this image extracted from the official docs:

Also, it worth mentioning that a pipeline is not a process and it does not hold state.

By default, a pipeline will not leak any exception that might be raised inside it. Instead, it will only return either {:ok, any()} or {:error, %Opus.PipelineError{}}. But you can change this behaviour by making Opus leak either any exceptions or specific ones, for the entire pipeline or for specific stages. More details about the implementation below.

When an {:error, %Opus.PipelineError{}} is returned, it means that Opus stopped at a certain stage and didn't run the next ones.

To call the pipeline and inserting the initial output, you call the function call/1 implemented by Opus on your module when you use Opus.Pipeline in the module:

Quiqup.DispatchOrderStagePipeline.call(%{order: order})

After, Opus will call the first stage. In our example, it is the check stage. Each stage has its own way to work (more details below).

The stages

In general, each stage will receive an atomized function name to be executed (eg. :validate_order). All those functions must be implemented with an arity of 1, and the given argument will be the current pipeline data. It is convenient to insert either a struct or a map to the pipeline.

That said, this is how it works:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline

  check :validate_order
  step :persist_stage

  def validate_order(%{order: order} = pipeline) do
    # validation code returns true or false
  end

  def persist_stage(%{order: order} = pipeline) do
    # persist code ...
  end
end

{:ok, _} = Quiqup.DispatchOrderStagePipeline.call(%{order: order})

Talking briefly about each stage in our main example above (you can check out all the details in the official doc):

check

The function must return a boolean. If true, Opus will allow the pipeline to continue and will call the next stage giving the same pipeline data as it was given to this stage. If false, Opus will halt the pipeline and will return a tuple {:error, %Opus.PipelineError{}}.

You may choose to pass an error_message: atom | String.t option to this stage in order to match the validation error, eg: check :valid_params?, error_message: :invalid_initial_input or to provide an elegant error to be used, eg: check :valid_params?, error_message: "Invalid input. Expected :order to be given.". The given value will be returned inside %Opus.PipelineError{}.

step

The function will perform any action and will return a new pipeline data to be received by the next stage. That said, you can return the same received pipeline with more data in it, or a new map with another data and the initial input will be lost.

This stage is also useful to just add new data to the pipeline without making anything special. In our example, we have step :assign_courier which is doing that, like this:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline

  step :assign_courier
  step :next_stage

  def assign_courier(%{order: order} = pipeline) do
      put_in(pipeline, [:courier], order.courier)
  end

  def next_stage(%{order: order, courier: courier})
end

Quiqup.DispatchOrderStagePipeline.call(%{order: order})

This is useful for decoupling the access of something in the pipeline or to add a flag to help you to take decisions on the next stages, like:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline

  step :assign_availability
  step :persist, if: :available?
  step :send_alert, if: :persisted?

  def assign_availability(pipeline) do
      put_in(pipeline, [:availability], :unavailable)
  end

  def persist(%{order: order} = pipeline) do
      stage = persist_stage(order)
    put_in(pipeline, [:stage], stage)
  end

  def send_alert(), do: #something

  def available?(%{availability: :available}), do: true
  def available?(_), do: false

  def persisted?(%{stage: _}), do: true
  def persisted?(_), do: false
end

Quiqup.DispatchOrderStagePipeline.call(%{order: order})

Receives another Opus pipeline module. This is useful when you want to perform something more complex at any time in the pipeline and you want to use another pipeline for that. Opus will call its .call/1 function and the return of the last stage of this module will be the new pipeline data for the current module.

This was important for us because we could perform a different kind of persistence depending on the slug of the next order stage. So, instead of having a common step :persist_stage we came up with something like:

defmodule Quiqup.DispatchOrderStagePipeline do
  # ...
  link PersistCollectStagePipeline, if: &PersistCollectStagePipeline.call?/1
  link PersistPickStagePipeline, if: &PersistPickStagePipeline.call?/1
  link PersistPayStagePipeline, if: PersistPayStagePipeline.call?/1
  # ...
end

And this call?/1 (which can be confusing) was implemented by us in the pipeline, so we could use the if option that Opus gives us (more details below) to check if we should call that specific persistence, based on what we had in the current pipeline data. Example:

defmodule Quiqup.PersistCollectStagePipeline do
  step :persist_stage

  def call?(%{order: order}) do
    # check stuff and return true or false
  end

  def persist_stage(%{order: order})
end

tee

The same as step but if anything bad happens, Opus won't halt the pipeline or return any {:error, _}. This is useful when you want to perform something that might be wrong, but you don't want to treat it as an error. Instead, you can log it somewhere and move on.

In our case, we wanted to call an external service, and because nothing is available 100% of the time, we don't want to halt our pipeline or return an error for something we can try again later on.

Stage options

Opus gives you some incredible options on each stage, such as performing something with an anonymous function, or using conditionals to only perform a certain stage when the conditional is true (as we saw above).

You should check out the official documentation for that also.

Joining all together, here is what you can do:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline
  import Quiqup.Notifier, only: [emit_event: 1]

  check :valid_input?, with: &match?(%{order: _}, &1)
  check :validate_order, error_message: :order_is_not_valid
  step :assign_next_stage_slug, raise: true, if: :should_assign?
  step :assign_courier, instrument?: false
  step :persist_stage, raise: [PersistanceError]
  step :push_notification, retry_times: 3, retry_backoff: fn -> lin_backoff(10, 2) |> cap(100) end
  tee :emit_event, with: &emit_event(&1)
end

Briefly, we used:

  • with: accepts an anonymous function to be executed, avoiding the need for defining that function name you gave as the argument;
  • error_message: since Opus has its own convention to create the error message when receiving {:error, %Opus.PipelineError{}}, you can define your own error message for a stage;
  • raise: Opus won't raise any error unless you allow it. That said, you can give a value true (default is false), then Opus won't catch any raised exception for that stage, or you can give a list of exceptions so Opus won't catch them either.

This option can be given for the use macro, like:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline, raise: true
  ...
end
  • instrument?: if you want to disable instrumentation (more below), set it to false (default is true).
  • if: only executes that stage when a given conditional is true.
  • retry_times: super useful when you need to perform something external that might not be available. It received an integer.
  • retry_backoff: it's the delay between each retry. Works together with retry_times and it receives either an atom related to a function or an anonymous function. This function must return an Enumerable.t with as many numbers as retries given to retry_times. Opus uses ElixirRetry under the hood, which allows you to use functions like lin_backoff/2 and cap/2.

Calling a specific stage

For testing purposes, it is super helpful to call a specific stage or call the entire pipeline except for a certain stage. For example, in our case, we use to split the pipeline into two or more blocks and test only that part. And thankfully Opus gives us this power:

Quiqup.DispatchOrderStagePipeline.call(%{order: order}, only: [:valid_input?])

Quiqup.DispatchOrderStagePipeline.call(%{order: order}, except: [:push_notification, :emit_event])

Instrumentation

This is probably one of the most amazing features of Opus. It gives us the power to create instrumentation) inside of our pipeline or completely decoupled.

Basically, there are three hooks, before_stage, stage_skipped, stage_completed, pipeline_started and pipeline_completed, and you can "listen" to them inside the module, as:

defmodule Quiqup.DispatchOrderStagePipeline do
  step :persist

  instrument :stage_completed, %{stage: %{name: :persist}}, fn %{time: time} ->
    # send to the monitoring tool of your choice
  end
end

or create an instrumentation module by adding it to your config/*.exs, as:

config :opus, :instrumentation, [PipelineInstrumentation]

defmodule PipelineInstrumentation do
  def instrument(:pipeline_started, %{pipeline: Quiqup.DispatchOrderStagePipeline}, %{input: input}) do
    # publish the metrics to a specific backend
  end

  def instrument(:stage_completed, %{stage: %{pipeline: Quiqup.DispatchOrderStagePipeline}}, %{time: time}) do
    # publish the metrics to a specific backend
  end
end

You also can disable the instrumentation for a certain stage as mentioned in the options above, or for the entire module, like:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline, instrument?: false
  ...
end

Check out the documentation for more useful examples on instrumentations.

Graph

At last, Opus implements a pipeline flow visualization with the help of Graphviz. As mentioned above, we have this complex business logic for creating tasks to our beloved couriers and using this feature is super helpful as an attachment to the internal documentation of our products.

Basically, you just need to run Opus.Graph.generate(:your_app) and all the magic happens. This command will return a statused tuple with the generated image.

This is how it looks like:

Check out the documentation to see another example.

What's next?

Opus has 0.6.0 on the way to be published soon and it is going to add a new stage called skip. And this is how it's gonna work:

defmodule Quiqup.DispatchOrderStagePipeline do
  use Opus.Pipeline
  ...
  skip if: :job_is_finished?
  ...

  @spec job_is_finished?(map()) :: bool()
end

Instead of receiving a function name, it will receive an if: :function, which makes skip more of a flow control than a stage itself. If this function returns true, then the pipeline is halted but a {:ok, :skipped} is returned by Opus.

Basically, it should be used when you want to check but don't want to return an error when the check fails. The next stage will receive the current pipeline data, as it happens on check stage.

If you want to play with it, you can try to install Opus referencing the master branch.

Conclusion

Opus helped us to write readable, decoupled and maintainable pipeline modules and I can't think on how we could grow our logic and implement them in such a clean and easy way as Opus provide us.

It would be nice to see another use cases and thoughts around Opus, so if you play with it at any time, share some code with us!

Thanks, Zorbash, for helping me out by reviewing the article!

Happy code and long life to the alchemy 😉