Elixir gives you the tools to easily write concurrent code. In an earlier post, I introduced concurrency in Elixir by looking at the building blocks of concurrency. Processes follow the actor model of concurrency and are the core underlying construct with which you can then send and receive messages between other processes.
What you might have seen is that there was a fair amount of boilerplate involved in doing common tasks, such as wanting to keep track of some state within a process or making a blocking call and waiting for a response.
Thankfully Elixir comes with a number of common abstractions to make writing concurrent code even easier. In this article we will be looking at Task, Agent, and GenServer.
Task
The Task
module makes doing work concurrently almost effortless. There are only two methods you tend to work with: async
to begin asynchronous work done in another process, and await
which waits for that work to finish and provides you with the result.
Let's look at a very simple example to see it in action, and then we'll look at how it might be used to speed up a map
by doing it in parallel.
task = Task.async(fn -> 5 + 5 end)
The above line will process the anonymous function in a separate process. You are free to continue on your way, and as long as you have the reference to task
handy, you can access the result of the anonymous function. What the async
function returns is basically a reference to the PID of the process running this task, which is needed to fetch the result:
%Task{owner: #PID<0.80.0>, pid: #PID<0.82.0>, ref: #Reference<0.0.7.207>}
Let's check on the pid
of the task to see what is happening with that process:
Process.info(task.pid) nil Process.alive?(task.pid) false
As you can see, that process is no longer alive...as soon as the Task has finished its work, that process is shut down. So how do we access the response from it then? The key is to look in our own mailbox (the calling process):
Process.info(task.owner)[:messages] [{#Reference<0.0.7.207>, 10}, {:DOWN, #Reference<0.0.7.207>, :process, #PID<0.82.0>, :normal}]
task.owner
happens to be our current process, or self()
. Here we can see two messages waiting to be received. The first is the response to our async task, and the second is that the reference to our Task process has notified us that it has shut down. We could use a receive
block to access these messages, but the Task
module makes it even easier:
Task.await(task) 10
If we now look back at our current process's messages, we'll see that they have been emptied:
Process.info(task.owner)[:messages] []
Parallel map With Task
Let's take a look at a parallel map example using Task's async
and await
functions. I used this example in the previous article without using Task
, and it turns out to be similar but much simpler and easy to reason about now that the complexity has been hidden from us.
defmodule Statuses do def map(urls) do urls |> Enum.map(&(Task.async(fn -> process(&1) end))) |> Enum.map(&(Task.await(&1))) end def process(url) do case HTTPoison.get(url) do {:ok, %HTTPoison.Response{status_code: status_code}} -> {url, status_code} {:error, %HTTPoison.Error{reason: reason}} -> {:error, reason} end end end
We'll use the same tests from last time:
defmodule StatusesTest do use ExUnit.Case test "parallel status map" do urls = [ url1 = "http://www.fakeresponse.com/api/?sleep=2", url2 = "http://www.fakeresponse.com/api/?sleep=1", url3 = "http://www.fakeresponse.com/api/?status=500", url4 = "https://www.leighhalliday.com", url5 = "https://www.reddit.com" ] assert Statuses.map(urls) == [ {url1, 200}, {url2, 200}, {url3, 500}, {url4, 200}, {url5, 200} ] end end
These tests finish in approximately 2.1 seconds, which shows us that it is indeed working because it is about the length of the slowest HTTP call of two seconds.
Agent
Agents make it easy to store state in a process. This allows us to share state/date across multiple processes or without having to pass a "big bag" of data to every function that may need access to it.
We'll create a small module that allows to get and set configuration keys which live in an Agent. The start_link
function begins the Agent process, and then get
and set
access and update the configuration values.
defmodule Configure do def start_link(initial \\ %{}) do Agent.start_link(fn -> initial end, name: __MODULE__) end def get(key) do Agent.get(__MODULE__, &Map.fetch(&1, key)) end def set(key, value) do Agent.update(__MODULE__, &Map.put(&1, key, value)) end end
You'll notice we didn't return or need a PID in the example above. The reason for that is that we've bound this process to the module's name (which means there can only be one). Let's write some simple tests to ensure it is working as expected.
defmodule ConfigureTest do use ExUnit.Case test "get with initial value" do Configure.start_link(%{env: :production}) assert Configure.get(:env) == {:ok, :production} end test "error when missing value" do Configure.start_link() assert Configure.get(:env) == :error end test "set and then get value" do Configure.start_link() Configure.set(:env, :production) assert Configure.get(:env) == {:ok, :production} end end
Agents store their state in memory, so you should only store data that can be easily rebuilt in some sort of restart strategy (using a Supervisor!). Although we used a Map
in the example above, their state can be any Elixir data type.
All processes in Elixir process requests sequentially. Even though Elixir is highly concurrent, each process handles one request at a time. If a request to the process is slow, it will block everyone else wanting access to the data in this process. This is very useful, though. Requests are handled sequentially in the order that they come in, so it is very predictable in that sense. Just make sure that "heavy lifting" or slow calculations aren't done in the Agent process itself but rather in the caller process or elsewhere.
If you are looking for something more powerful or flexible, check out ETS. For a comparison between ETS, Agents, and other external tools such as Redis, there is a great article written by Barry Jones about this topic.
GenServer
So far we have looked at Tasks for managing concurrent code execution and Agents for managing state within a process. Next we will look at the GenServer module, which combines state with concurrent code execution.
We'll convert the MyLogger
example from Part 1 into a module which uses GenServer
.
defmodule MyLogger do use GenServer # Client def start_link do GenServer.start_link(__MODULE__, 0) end def log(pid, msg) do GenServer.cast(pid, {:log, msg}) end def print_stats(pid) do GenServer.cast(pid, {:print_stats}) end def return_stats(pid) do GenServer.call(pid, {:return_stats}) end # Server def handle_cast({:log, msg}, count) do IO.puts msg {:noreply, count + 1} end def handle_cast({:print_stats}, count) do IO.puts "I've logged #{count} messages" {:noreply, count} end def handle_call({:return_stats}, _from, count) do {:reply, count, count} end end
The first thing you'll notice is that we included the use GenServer
line at the top of our module. This allows us to take advantage of the GenServer
functionality.
Next up, you'll see the start_link
function. What this does is call the GenServer.start_link
function, spawning a new process for the current module and providing the initial state, which in this case is 0. It returns a tuple with {:ok, pid}
.
GenServers provide both synchronous and asynchronous functionality. Synchronous, where you want to block for an answer before continuing, is called call
, whereas asynchronous requests are called cast
.
Cast - asynchronous
When we call the line of code GenServer.cast(pid, {:log, msg})
inside of the log
function, it will send a message over to our process with {:log, msg}
as the arguments. It's up to us to write a function for handling this message.
We do that by implementing the handle_cast
function, where the first argument should match the incoming arguments being sent over: {:log, msg}
, and the second argument is the current state of our process.
Inside of the handle_cast
function, we should perform the task that we're handling and then respond with a tuple containing the atom :noreply
along with the new state of our process.
def handle_cast({:log, msg}, count) do IO.puts msg {:noreply, count + 1} end
Call - synchronous
When we need some sort of response from the process, it's time for us to use the call
functionality. GenServer.call(pid, {:return_stats})
sends a message to our process with the argument {:return_stats}
. Very much like the cast
functionality, it is up to us to write a function called handle_call
, which has the job of dealing with that incoming message and providing a response.
def handle_call({:return_stats}, _from, count) do {:reply, count, count} end
The arguments differ slightly from cast
to call
. It contains an additional from
argument, which is the process that made the request. The response is different as well. It should be a tuple made up of three items: The first is an atom of :reply
, the second is the value we should send back to the caller as the response, and the third is the new state our process should have.
In the example above, the return value and the new state are the same, which is why count
is repeated twice.
Let's use our module to try it out!
{:ok, pid} = MyLogger.start_link MyLogger.log(pid, "First message") MyLogger.log(pid, "Another message") MyLogger.print_stats(pid) stats = MyLogger.return_stats(pid)
Conclusion
In this article, we have taken a look at three concurrent code abstractions that build upon the underlying functionality provided by Elixir. The first was Task
, which executes code inside of a separate process and optionally waits for a response. The second was Agent
, which is a framework for managing state within a process. The third was GenServer
, a framework for executing code synchronously or asynchronously and managing state at the same time.
What we're missing is the ability to monitor our processes and react if and when they crash. We can do this ourselves, but if we did, we'd be missing out on some fantastic functionality which comes with Supervisors and OTP. Essentially they allow us to monitor processes and define what should happen if they crash, automatically restarting the monitored process(es) and their children. We'll explore this in my next article!