Concurrency Abstractions in Elixir

Written by: Leigh Halliday

7 min read

Elixir gives you the tools to easily write concurrent code. In an earlier post, I introduced concurrency in Elixir by looking at the building blocks of concurrency. Processes follow the actor model of concurrency and are the core underlying construct with which you can then send and receive messages between other processes.

What you might have seen is that there was a fair amount of boilerplate involved in doing common tasks, such as wanting to keep track of some state within a process or making a blocking call and waiting for a response.

Thankfully Elixir comes with a number of common abstractions to make writing concurrent code even easier. In this article we will be looking at Task, Agent, and GenServer.

Task

The Task module makes doing work concurrently almost effortless. There are only two methods you tend to work with: async to begin asynchronous work done in another process, and await which waits for that work to finish and provides you with the result.

Let's look at a very simple example to see it in action, and then we'll look at how it might be used to speed up a map by doing it in parallel.

task = Task.async(fn -> 5 + 5 end)

The above line will process the anonymous function in a separate process. You are free to continue on your way, and as long as you have the reference to task handy, you can access the result of the anonymous function. What the async function returns is basically a reference to the PID of the process running this task, which is needed to fetch the result:

%Task{owner: #PID<0.80.0>, pid: #PID<0.82.0>, ref: #Reference<0.0.7.207>}

Let's check on the pid of the task to see what is happening with that process:

Process.info(task.pid)
nil
Process.alive?(task.pid)
false

As you can see, that process is no longer alive...as soon as the Task has finished its work, that process is shut down. So how do we access the response from it then? The key is to look in our own mailbox (the calling process):

Process.info(task.owner)[:messages]
[{#Reference<0.0.7.207>, 10},
 {:DOWN, #Reference<0.0.7.207>, :process, #PID<0.82.0>, :normal}]

task.owner happens to be our current process, or self(). Here we can see two messages waiting to be received. The first is the response to our async task, and the second is that the reference to our Task process has notified us that it has shut down. We could use a receive block to access these messages, but the Task module makes it even easier:

Task.await(task)
10

If we now look back at our current process's messages, we'll see that they have been emptied:

Process.info(task.owner)[:messages]
[]

Parallel map With Task

Let's take a look at a parallel map example using Task's async and await functions. I used this example in the previous article without using Task, and it turns out to be similar but much simpler and easy to reason about now that the complexity has been hidden from us.

defmodule Statuses do
  def map(urls) do
    urls
      |> Enum.map(&amp;(Task.async(fn -> process(&amp;1) end)))
      |> Enum.map(&amp;(Task.await(&amp;1)))
  end
  def process(url) do
    case HTTPoison.get(url) do
      {:ok, %HTTPoison.Response{status_code: status_code}} ->
        {url, status_code}
      {:error, %HTTPoison.Error{reason: reason}} ->
        {:error, reason}
    end
  end
end

We'll use the same tests from last time:

defmodule StatusesTest do
  use ExUnit.Case
  test "parallel status map" do
    urls = [
      url1 = "http://www.fakeresponse.com/api/?sleep=2",
      url2 = "http://www.fakeresponse.com/api/?sleep=1",
      url3 = "http://www.fakeresponse.com/api/?status=500",
      url4 = "https://www.leighhalliday.com",
      url5 = "https://www.reddit.com"
    ]
    assert Statuses.map(urls) == [
      {url1, 200},
      {url2, 200},
      {url3, 500},
      {url4, 200},
      {url5, 200}
    ]
  end
end

These tests finish in approximately 2.1 seconds, which shows us that it is indeed working because it is about the length of the slowest HTTP call of two seconds.

Agent

Agents make it easy to store state in a process. This allows us to share state/date across multiple processes or without having to pass a "big bag" of data to every function that may need access to it.

We'll create a small module that allows to get and set configuration keys which live in an Agent. The start_link function begins the Agent process, and then get and set access and update the configuration values.

defmodule Configure do
  def start_link(initial \\ %{}) do
    Agent.start_link(fn -> initial end, name: __MODULE__)
  end
  def get(key) do
    Agent.get(__MODULE__, &amp;Map.fetch(&amp;1, key))
  end
  def set(key, value) do
    Agent.update(__MODULE__, &amp;Map.put(&amp;1, key, value))
  end
end

You'll notice we didn't return or need a PID in the example above. The reason for that is that we've bound this process to the module's name (which means there can only be one). Let's write some simple tests to ensure it is working as expected.

defmodule ConfigureTest do
  use ExUnit.Case
  test "get with initial value" do
    Configure.start_link(%{env: :production})
    assert Configure.get(:env) == {:ok, :production}
  end
  test "error when missing value" do
    Configure.start_link()
    assert Configure.get(:env) == :error
  end
  test "set and then get value" do
    Configure.start_link()
    Configure.set(:env, :production)
    assert Configure.get(:env) == {:ok, :production}
  end
end

Agents store their state in memory, so you should only store data that can be easily rebuilt in some sort of restart strategy (using a Supervisor!). Although we used a Map in the example above, their state can be any Elixir data type.

All processes in Elixir process requests sequentially. Even though Elixir is highly concurrent, each process handles one request at a time. If a request to the process is slow, it will block everyone else wanting access to the data in this process. This is very useful, though. Requests are handled sequentially in the order that they come in, so it is very predictable in that sense. Just make sure that "heavy lifting" or slow calculations aren't done in the Agent process itself but rather in the caller process or elsewhere.

If you are looking for something more powerful or flexible, check out ETS. For a comparison between ETS, Agents, and other external tools such as Redis, there is a great article written by Barry Jones about this topic.

GenServer

So far we have looked at Tasks for managing concurrent code execution and Agents for managing state within a process. Next we will look at the GenServer module, which combines state with concurrent code execution.

We'll convert the MyLogger example from Part 1 into a module which uses GenServer.

defmodule MyLogger do
  use GenServer
  # Client
  def start_link do
    GenServer.start_link(__MODULE__, 0)
  end
  def log(pid, msg) do
    GenServer.cast(pid, {:log, msg})
  end
  def print_stats(pid) do
    GenServer.cast(pid, {:print_stats})
  end
  def return_stats(pid) do
    GenServer.call(pid, {:return_stats})
  end
  # Server
  def handle_cast({:log, msg}, count) do
    IO.puts msg
    {:noreply, count + 1}
  end
  def handle_cast({:print_stats}, count) do
    IO.puts "I've logged #{count} messages"
    {:noreply, count}
  end
  def handle_call({:return_stats}, _from, count) do
    {:reply, count, count}
  end
end

The first thing you'll notice is that we included the use GenServer line at the top of our module. This allows us to take advantage of the GenServer functionality.

Next up, you'll see the start_link function. What this does is call the GenServer.start_link function, spawning a new process for the current module and providing the initial state, which in this case is 0. It returns a tuple with {:ok, pid}.

GenServers provide both synchronous and asynchronous functionality. Synchronous, where you want to block for an answer before continuing, is called call, whereas asynchronous requests are called cast.

Cast - asynchronous

When we call the line of code GenServer.cast(pid, {:log, msg}) inside of the log function, it will send a message over to our process with {:log, msg} as the arguments. It's up to us to write a function for handling this message.

We do that by implementing the handle_cast function, where the first argument should match the incoming arguments being sent over: {:log, msg}, and the second argument is the current state of our process.

Inside of the handle_cast function, we should perform the task that we're handling and then respond with a tuple containing the atom :noreply along with the new state of our process.

def handle_cast({:log, msg}, count) do
  IO.puts msg
  {:noreply, count + 1}
end

Call - synchronous

When we need some sort of response from the process, it's time for us to use the call functionality. GenServer.call(pid, {:return_stats}) sends a message to our process with the argument {:return_stats}. Very much like the cast functionality, it is up to us to write a function called handle_call, which has the job of dealing with that incoming message and providing a response.

def handle_call({:return_stats}, _from, count) do
  {:reply, count, count}
end

The arguments differ slightly from cast to call. It contains an additional from argument, which is the process that made the request. The response is different as well. It should be a tuple made up of three items: The first is an atom of :reply, the second is the value we should send back to the caller as the response, and the third is the new state our process should have.

In the example above, the return value and the new state are the same, which is why count is repeated twice.

Let's use our module to try it out!

{:ok, pid} = MyLogger.start_link
MyLogger.log(pid, "First message")
MyLogger.log(pid, "Another message")
MyLogger.print_stats(pid)
stats = MyLogger.return_stats(pid)

Conclusion

In this article, we have taken a look at three concurrent code abstractions that build upon the underlying functionality provided by Elixir. The first was Task, which executes code inside of a separate process and optionally waits for a response. The second was Agent, which is a framework for managing state within a process. The third was GenServer, a framework for executing code synchronously or asynchronously and managing state at the same time.

What we're missing is the ability to monitor our processes and react if and when they crash. We can do this ourselves, but if we did, we'd be missing out on some fantastic functionality which comes with Supervisors and OTP. Essentially they allow us to monitor processes and define what should happen if they crash, automatically restarting the monitored process(es) and their children. We'll explore this in my next article!

Stay up to date

We'll never share your email address and you can opt out at any time, we promise.