ActionCable is coming to Rails 5 and brings with it the promise of using WebSockets directly in Rails.
Ruby has a notoriously bad concurrency story, and that certainly extends into the realm of WebSockets and pubsub. ActionCable may be suitable for a small number of authenticated sessions, but scaling persistent connections to thousands or tens-of-thousands won't be easy. That's all right, the beauty of the internet is that you can mix and match technologies as you see fit!
There are plenty of other languages that your application can lean on to achieve real-time over WebSockets. In the recent past, you may have reached to Node.js for this kind of work, but there is a better option.
Pubsub is by its nature a distributed system, and it's no secret that Erlang is a superstar in the distributed systems realm. Rather than reach for Erlang directly, we'll leverage Elixir's productive tooling to build out a WebSocket microservice.
By using a surrogate microservice, your Rails application remains the authority on business logic. It still handles all database interactions, sending email, and plugs away at the other work Rails excels at -- it just doesn't hold thousands of connections.
Gluing PubSub Together
You may recall from my previous article on service communication with pubsub that it is very easy to publish messages from Ruby. As a refresher, here is the simple Publisher
class:
require 'redis' class Publisher def publish(channel, message) Redis.current.publish(channel, message) end end
The Rails application will publish score updates whenever data changes:
# from a controller or a background worker publisher.publish('teams-123-scores', score.to_json)
Publishing is the easy part though! So long as you have a consistent naming scheme for channels and serialize your messages, it is very straightforward. What about the Elixir side?
Getting Started With Compatibility
In the Elixir world, Phoenix is an obvious choice for hosting surrogate WebSocket connections. There are two pubsub implementations broadly available for Phoenix: Redis
and PG2
(using Erlang's distributed named process groups).
As it turns out, both modules rely on Erlang encoded terms, so neither are conducive to communication outside the Erlang ecosystem. Developing an alternate pubsub module for Phoenix is beyond the scope of this article. Instead, we'll build a simple system that allows Ruby and Elixir to communicate over pubsub with a minimal API.
Setting Up Elixir and Redis PubSub
From here on, I'll assume you are familiar with Elixir, the mix
build tool, and testing with ExUnit. Our first step is to create a surrogate project:
mix new surrogate && cd surrogate
Now we'll install the Redis client. There are a few different Redis clients in the Erlang world, but we'll be using the native Elixir client Redix. It is easy to configure, easy to test, and wonderfully written. Open the generated mix.exs
and modify the deps
:
defp deps do [{:redix, "~> 0.3"}] end
Now retrieve the dependencies:
mix deps.get
That's enough to get the project set up and explore pubsub with a small test. The only functions our module needs are subscribe/1
, unsubscribe/1
, and possibly broadcast/2
. That makes it consistent with the Phoenix.PubSub
API. Our test gets started by testing subscribe/1
:
defmodule SurrogateTest do use ExUnit.Case alias Surrogate.PubSub test "subscribe/1 links to a topic" do {:ok, _pid} = PubSub.start_link :ok = PubSub.subscribe("topic.1") :ok = PubSub.subscribe("topic.2") {:ok, other_conn} = Redix.start_link {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello)) {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.2 world)) assert_receive "hello" assert_receive "world" end
The test first establishes a link to a new surrogate server and then uses the subscribe/1
API to register the test process for published messages. Next it starts another link directly through Redix and publishes messages on both of the subscribed topics. Finally, it verifies that both of the simple hello
and world
messages were received by the test process.
Actually defining the Surrogate.PubSub
module is a bit more involved. Retaining a list of subscribed processes is essential, as every connection from the web is a separate process. In order to retain state, we must use an Agent
or a GenServer
. As our module combines state and functions to manipulate that state, we will use the GenServer
behaviour.
defmodule Surrogate.PubSub do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, [name: __MODULE__]) end ## Callbacks def init(opts) do {:ok, conn} = Redix.PubSub.start_link(opts) {:ok, %{conn: conn, subs: %{}}} end end
The GenServer
behavior is mixed in via use
, and we then define the start_link/1
convenience function. As soon as the process is started, it links itself to Redix.PubSub
and retains a connection to Redis. Now we're ready to add the subscribe/1
api method.
def subscribe(topic) do GenServer.call(__MODULE__, {:subscribe, topic}) end ## Callbacks def handle_call({:subscribe, topic}, {pid, _}, %{conn: conn, subs: subs} = state) do :ok = Redix.PubSub.subscribe(conn, topic, self()) tops = Map.get(subs, pid, MapSet.new) subs = Map.put(subs, pid, MapSet.put(tops, topic)) {:reply, :ok, %{state | subs: subs}} end
Subscribe issues a handle_call/3
, part of the GenServer
behaviour, to synchronously subscribe the caller process to a topic. Simultaneously it adds the calling process (seen here as {pid, _}
) to the server's state.
Every process is a key in the subs
map, and its value is a set of all the topics the process is subscribed to. This mapping is critical for receiving published messages and managing unsubscribes later.
Running the tests now doesn't quite work, because the Redix.PubSub
module sends out-of-band messages back to the pubsub process, and they aren't being handled. To handle these out-of-band messages, we have to use the handle_info/2
callback:
def handle_info({:redix_pubsub, :message, message, topic}, %{subs: subs} = state) do for {pid, topics} <- subs do if MapSet.member?(topics, topic), do: send pid, message end {:noreply, state} end def handle_info(_msg, state) do {:noreply, state} end
There! Now the module can properly handle subscriptions and forwarding messages to the subscribed processes.
mix test test/surrogate_test.exs Compiled lib/surrogate.ex . Finished in 0.1 seconds (0.06s on load, 0.04s on tests) 1 test, 0 failures Randomized with seed 117850
What Else Does PubSub Need?
The subscribe/1
function is almost enough for one-way communication between the Rails app and the Surrogate. There isn't any need to support broadcast/2
(which would send messages back upstream), leaving unsubscribe/1
.
When the current process is unsubscribed, it should no longer receive any forwarded messages. The implementation of unsubscribe/1
is simple, effectively subscribe/1
in reverse. First, add a new test case:
test "unsubscribe/1 unlinks a topic" do {:ok, _pid} = PubSub.start_link :ok = PubSub.subscribe("topic.1") :ok = PubSub.unsubscribe("topic.1") {:ok, other_conn} = Redix.start_link {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello)) refute_receive "hello" end
The test refutes that any message is broadcast after unsubscribing to "topic.1"
. Of course this initially fails -- now we make it pass:
def unsubscribe(topic) do GenServer.call(__MODULE__, {:unsubscribe, topic}) end ## Callbacks def handle_call({:unsubscribe, topic}, {pid, _}, %{subs: subs} = state) do tops = Map.get(subs, pid, MapSet.new) subs = Map.put(subs, pid, MapSet.delete(tops, topic)) {:reply, :ok, %{state | subs: subs}} end
Again we rely on the power of call
, which includes the calling processes' pid
in the second argument. The handler finds the entry for the calling pid
and removes the specified topic from its list. When future messages are published, that topic won't be found and the message won't be forwarded.
This approach is simple for illustration purposes, but it is quite naive and leaves behind dangling references to processes. Production grade implementations, such as Phoenix.PubSub
, run periodic GC to clean up after unsubscribes.
Stitching into WebSockets
Our connections are built on top of Plug, which comes with an adapter for the Cowboy server. Every HTTP connection that comes to Cowboy is wrapped in a separate process. That includes WebSocket connections, which are initiated as HTTP connections and then upgraded to WebSockets.
That's perfect for our pubsub setup! Once a connection is upgraded, the server simply waits for topic
subscriptions and forwards them on to the Surrogate
module.
Add both plug
and cowboy
to deps
in mix.exs
, ensure they are started as applications
, and define Surrogate
as the application to start.
def application do [mod: {Surrogate, []}, applications: [:cowboy, :plug, :logger]] end defp deps do [{:redix, "~> 0.3"}, {:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}] end
In order to treat Surrogate
as an application, it needs a base module that uses the Application behavior. Forgive me if what follows is a little like the how to draw an owl process. There are a lot of details in application setup that are outside the scope of this article, and it is necessary to get a real server.
defmodule Surrogate do use Application alias Plug.Adapters.Cowboy def start(_, _) do import Supervisor.Spec children = [ Cowboy.child_spec(:http, Surrogate.Server, [], [dispatch: dispatch]), worker(Surrogate.PubSub, []) ] opts = [strategy: :one_for_one, name: Surrogate.Supervisor] Supervisor.start_link(children, opts) end defp dispatch do [{:_, [{"/ws", Surrogate.Socket, []}, {:_, Cowboy.Handler, {Surrogate.Server, []}}]}] end end
The Surrogate module is defined as a proper Erlang app, which supervises the pubsub and cowboy workers. The supervision ensures that any time a worker crashes a new one is started up in its place. Notice that the number of modules has expanded; now there are also Surrogate.Server
and Surrogate.Socket
modules. These will handle our incoming HTTP
and WebSocket
connections, respectively.
First, the Surrogate.Server
module is defined as a simple plug that always returns an "OK" text response.
defmodule Surrogate.Server do import Plug.Conn def init(_opts) do end def call(conn, _opts) do conn |> put_resp_content_type("application/text") |> send_resp(200, "OK") end end
We can verify that the server is working now by starting the application up with iex -S mix
and visiting localhost:4000
. If everything went according to plan, you'll see a friendly "OK" in your browser. Now the final module needed to relay messages, Surrogate.Socket
:
defmodule Surrogate.Socket do @behaviour :cowboy_websocket_handler def init(_, _req, _opts) do {:upgrade, :protocol, :cowboy_websocket} end ## Callbacks def websocket_init(_type, req, _opts) do {:ok, req, %{}, 60_000} end end
The module uses the cowboy_websocket_handler
behaviour, which requires callback handlers just like a GenServer
. Upgrading to a WebSocket is handled by websocket_init/3
, which is called whenever a new connection comes in to localhost:4000/ws
. Once connected, we can begin sending and receiving messages through websocket_handle/3
.
alias Surrogate.PubSub def websocket_handle({:text, "ping"}, req, state) do {:reply, {:text, "pong"}, req, state} end def websocket_handle({:text, "subscribe|" <> topic}, req, state) do PubSub.subscribe(topic) {:ok, req, state} end def websocket_handle({:text, "unsubscribe|" <> topic}, req, state) do PubSub.unsubscribe(topic) {:ok, req, state} end def websocket_handle({:text, _}, req, state) do {:ok, req, state} end
Each handler will pattern match on the incoming message, allowing the server to selectively handle subscribe
and unsubscribe
messages accordingly. At this point, the server can handle incoming messages, but it is unable to relay subscribed messages being returned from pubsub. As with all out-of-band messages to a server, those are handled with an _info/3
callback:
def websocket_info(message, req, state) do {:reply, {:text, message}, req, state} end
Here the message
is the exact value being published to the channel, and it is passed along to the WebSocket connection directly. Finally, we can test the round trip between Ruby, Elixir, and a web browser. Restart the surrogate server interactively with iex -S mix
, reload the page, and open up the JavaScript console:
let ws = new WebSocket("ws://localhost:4000/ws") ws.onmessage = (message) => console.log(message.data) ws.send("subscribe|topic.1")
All messages published to topic.
are fed to the console, whether they come from Ruby, a Redis CLI instance, or the Elixir shell. For example, when a 'score' object is serialized and published from Ruby, this is logged as data
in the console:
'{"id":12345,"value":"new score"}'
Monolith This Isn't
The system we've built is more complex than a monolith, but it will scale simply and perform reliably. It will easily handle thousands of topics and thousands of connections, even on the lightest weight hardware. Line for line, it may outweigh a simple naive Node.js implementation, but it is entirely fault tolerant right out of the box. That's the benefit we get from building on top of Erlang.
This system is real-time only; there aren't any mechanisms to rewind or catch up to missed messages after temporary disconnections. That is simply the nature of pubsub. Paired with a lack of channel authorization, it is perfectly suited to pushing updates over broadly available public channels. Think real-time sports updates, leaderboards, stock tickers, etc.
This is not the "beautiful monolith" approach, because that simply doesn't exist when you enter the realm of distributed systems. It relies on an external database to broker communication, which is a service in itself. When external dependencies are already at play, it is wise to leverage languages and frameworks that are perfectly suited to the task.