Microservice Communication with Queues

Written by: Leigh Halliday

Microservices are small programs that handle one task. A microservice that is never used is useless though -- it's the system on the whole that provides value to the user. Microservices work together by communicating messages back and forth so that they can accomplish the larger task.

Communication is key, but there are a variety of ways this can be accomplished. A pretty standard way is through a RESTful API, passing JSON back and forth over HTTP. In one sense, this is great; it's a form of communication that's well understood. However, this method isn't without flaws because it adds other factors, such as HTTP status codes and receiving/parsing requests and responses.

What other ways might microservices communicate back and forth? In this article, we're going to explore the use of a queue, more specifically RabbitMQ.

What Does RabbitMQ Do?

RabbitMQ provides a language-agnostic way for programs to send messages to each other. In simple terms, it allows a "Publisher/Producer" to send a message and allows for a "Consumer" to listen for those messages.

In one of its simpler models, it resembles what many Rails developers are used to with Sidekiq: the ability to distribute asynchronous tasks among one or more workers. Sidekiq is one of the first things I install on all my Rails projects. I don't think RabbitMQ would necessarily take its place, especially for things that work more easily within a Rails environment: sending emails, interacting with Rails models, etc.

It doesn't stop there though. RabbitMQ can also handle Pub/Sub functionality, where a single "event" can be published and one or more consumers can subscribe to that event. You can take this further where consumers can subscribe only to specific events and/or events that match the pattern they're watching for.

Finally, RabbitMQ can allow for RPCs (Remote Procedure Calls), where you're looking for an answer right away from another program... basically calling a function that exists in another program.

In this article, we'll be taking a look at both the "Topic" or pattern-based Pub/Sub approach, as well as how an RPC can be accomplished.

Event-based and asynchronous

The first example we'll be working with today is a sports news provider who receives incoming data about scores, goals, players, teams, etc. It has to parse the data, store it, and perform various tasks depending on the incoming data.

To make things a little clearer, let's imagine that, in one of the incoming data streams, we'll be notified about soccer goals.

When we discover that a goal has happened, there are a number of things that we need to do:

  • Parse and normalize the information

  • Store the details locally

  • Update the "box-score" for the game that the goal took place in

  • Update the league leaderboard showing who the top goal scorer is

  • Notify all subscribers (push notification) of a particular league, team, or player

  • And any number of other tasks or analysis that we need to do

Do we need to do all of those tasks in order? Should the program in charge of processing incoming data need to know about all of these tasks and how to accomplish them? I suggest that other than parsing/normalizing the incoming data and maybe even saving it locally, the rest of the tasks can be done asynchronously and that the program shouldn't really know or care about all of these other tasks.

What we can do is have the parser program emit an event (soccer.mls.goal for example), along with its accompanying information:

{
  league: 'MLS',
  team: 'Toronto FC',
  player: 'Sebastian Giovinco',
  opponent: 'New York City FC',
  time: '14:21'
}

The parser can then forget about it! It's done its work of emitting the event. The rest of the work will be done by any number of consumers who have subscribed to this specific event.

Producing in Ruby

To produce or emit events in Ruby, the first thing we need to do is install the bunny client, which allows Ruby to communicate with RabbitMQ. For an example, here is some fake incoming data that needs to trigger the goal event for soccer.

# Imagine the parsing happens here :)
soccer = Soccer.new
soccer.emit_goal(
  league: 'MLS',
  team: 'Toronto FC',
  player: 'Sebastian Giovinco',
  opponent: 'New York City FC',
  time: '14:21'
)

Let's next take a look at the emit_goal function inside of the Soccer class, which builds the event slug and packages the data together to be included in the event being emitted:

class Soccer
  include EventEmitter
  def emit_goal(raw_details)
    slug = "soccer.#{raw_details[:league]}.goal".downcase # "soccer.mls.goal"
    payload = raw_details.slice(:league, :team, :player, :opponent, :time)
    emit('live_events', slug, payload)
  end
end

The 'live_events' string has to do with which Exchange to publish the event to. An Exchange is basically like a router that decides which Queue(s) the event should be placed into. The emit method is inside of a Module I created to simplify emitting events:

module EventEmitter
  def emit(topic, slug, payload)
    conn = Bunny.new
    conn.start
    ch = conn.create_channel
    x = ch.topic(topic)
    x.publish(payload.to_json, routing_key: slug)
    puts " [OUT] #{slug}:#{payload}"
    conn.close
  end
end

It receives the topic, event slug, and event payload and sends that information to RabbitMQ.

Consuming in Ruby

So far we have produced an event, but without a consumer to consume it, the event will be lost. Let's create a Ruby consumer that is listening for all soccer goal events.

You may have noticed that what I was calling the event slug (or the routing_key) looked like "soccer.mls.goal". Picking a pattern to follow is important, because consumers can choose which events to listen for based on a pattern such as "soccer.*.goal": all soccer goals regardless of the league.

The consumer in this case will be some code which updates the leaderboard for the top goal scorers in the league. It is kicked off by running a Ruby file with this line:

SoccerLeaderboard.new.live_updates

The SoccerLeaderboard class has a method called live_updates which will call a receive method provided be an included Module. It will provide the topic, the pattern of event slug/routing_key to listen for, and a block of code to be called any time there is a new event to process.

class SoccerLeaderboard
  include EventReceiver
  def live_updates
    receive('live_events', 'soccer.*.goal') do |payload|
      puts "#{payload['player']} has scored a new goal."
    end
  end
end

The EventReceiver Module is a little larger, but for the most part it's just setting up a connection to RabbitMQ and telling it what it wants to listen for.

module EventReceiver
  def receive(topic, pattern, &block)
    conn = Bunny.new
    conn.start
    ch = conn.create_channel
    x = ch.topic(topic)
    q = ch.queue("", exclusive: true)
    q.bind(x, routing_key: pattern)
    puts " [INFO] Waiting for events. To exit press CTRL+C"
    begin
      q.subscribe(:block => true) do |delivery_info, properties, body|
        puts " [IN] #{delivery_info.routing_key}:#{body}"
        block.call(JSON.parse(body))
      end
    rescue Interrupt => _
      ch.close
      conn.close
    end
  end
end

Consuming in Elixir

I mentioned that RabbitMQ is language agnostic. What I mean by this is that we can not only have a consumer in Ruby listening for events, but we can have a consumer in Elixir listening for events at the same time.

In Elixir, the package I used to connect to RabbitMQ was amqp. One gotcha was that it relies on amqp_client which was giving me problems with Erlang 19. To solve that, I had to link directly to the GitHub repository because it doesn't appear that the fix has been published to Hex yet.

defp deps do
  [
    {:amqp_client, git: "https://github.com/dsrosario/amqp_client.git", branch: "erlang_otp_19", override: true},
    {:amqp, "~> 0.1.5"}
  ]
end

The code to listen for events in Elixir looks like the following code below. Most of the code inside of the start_listening method is just creating a connection to RabbitMQ and telling it what to subscribe to. The wait_for_messages is where the event processing takes place.

defmodule GoalNotifications do
  def start_listening do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Exchange.declare(channel, "live_events", :topic)
    {:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
    AMQP.Queue.bind(channel, queue_name, "live_events", routing_key: "*.*.goal")
    AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
    IO.puts " [INFO] Waiting for messages. To exit press CTRL+C, CTRL+C"
    wait_for_messages(channel)
  end
  def wait_for_messages(channel) do
    receive do
      {:basic_deliver, payload, meta} ->
      IO.puts " [x] Received [#{meta.routing_key}] #{payload}"
      wait_for_messages(channel)
    end
  end
end
GoalNotifications.start_listening

RPC... when you need an answer right away

Remote Procedure Calls can be accomplished with RabbitMQ, but I'll be honest: It's more involved than the examples above for more of a typical Pub/Sub approach. To me, it felt like each side (producer/consumer) has to act as both a producer and a consumer.

The flow is a little like this:

  • Program A asks Program B for some information, providing a unique ID for the request

  • Program A listens for responses that match the same unique ID

  • Program B receives request, does the work and provides a response with the same unique ID

  • Program A runs callback once matching unique ID is found in response from Program B

In this example, we'll be talking about a product's inventory... an answer we need to know right away to be sure that there is stock available for a customer to purchase.

inventory = ProductInventory.new('abc123').inventory
puts "Product has inventory of #{inventory}"

The ProductInventory class is quite simple, mostly because I've hidden the complexity of the RPC call inside of a class called RemoteCall.

class ProductInventory
  attr_accessor :product_sku
  def initialize(product_sku)
    @product_sku = product_sku
  end
  def inventory
    RemoteCall.new('inventory').response(product_sku)
  end
end

Now let's take a look at how RemoteCall is handling it:

require 'bunny'
require 'securerandom'
class RemoteCall
  attr_reader :lock, :condition
  attr_accessor :conn, :channel, :exchange, :reply_queue,
    :remote_response, :call_id, :queue_name
  def initialize(queue_name)
    @queue_name = queue_name
    @conn = Bunny.new
    @conn.start
    @channel = conn.create_channel
    @exchange = channel.default_exchange
    @reply_queue = channel.queue('', exclusive: true)
  end
  def response(payload)
    @lock = Mutex.new
    @condition = ConditionVariable.new
    response_callback(reply_queue)
    self.call_id = SecureRandom.uuid
    puts "Awaiting call with correlation ID #{call_id}"
    exchange.publish(payload,
      routing_key: queue_name,
      correlation_id: call_id,
      reply_to: reply_queue.name
    )
    lock.synchronize { condition.wait(lock) }
    remote_response
  end
  private
  def response_callback(reply_queue)
    that = self
    reply_queue.subscribe do |delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.remote_response = payload
        that.lock.synchronize { that.condition.signal }
      end
    end
  end
end

So if all that code was for the Producer, what does the Consumer look like? It's kicked off with:

server = InventoryServer.new
server.start

And the InventoryServer looks like:

require 'bunny'
class InventoryServer
  QUEUE_NAME = 'inventory'.freeze
  attr_reader :conn
  def initialize
    @conn = Bunny.new
  end
  def start
    conn.start
    channel = conn.create_channel
    queue = channel.queue(QUEUE_NAME)
    exchange = channel.default_exchange
    subscribe(queue, exchange)
  rescue Interrupt => _
    channel.close
    conn.close
  end
  def subscribe(queue, exchange)
    puts "Listening for inventory calls"
    queue.subscribe(block: true) do |delivery_info, properties, payload|
      puts "Received call with correlation ID #{properties.correlation_id}"
      product_sku = payload
      response = self.class.inventory(product_sku)
      exchange.publish(response.to_s,
        routing_key: properties.reply_to,
        correlation_id: properties.correlation_id
      )
    end
  end
  def self.inventory(product_sku)
    42
  end
end

Wow... that's a lot of work to make an RPC! RabbitMQ has a great guide explaining how this works in a variety of different languages.

Conclusion

Microservices don't always need to communicate synchronously, and they don't always need to communicate over HTTP/JSON either. They can, but next time you're thinking about how they should speak to each other, why not consider doing it asynchronously using RabbitM? It comes with a great interface for monitoring the activity of the queue and has fantastic client support in a variety of popular languages. It's fast, reliable, and scalable.

Microservices aren't free though... I think it's worthwhile considering whether the extra complexity involved in setting up separate services and providing them a way to communicate couldn't be better handled using something like Sidekiq and writing clean, modular code.

Stay up to date

We'll never share your email address and you can opt out at any time, we promise.