This article was originally published by Ross Kaffenberger on his personal site, and with his permission, we are sharing it here for Codeship readers.
One of the fundamental concepts in key Ruby libraries that embrace concurrency is the thread pool.
You can find examples of thread pool implementations in gems like puma, concurrent-ruby, celluloid, pmap, parallel, and ruby-thread.
A thread pool is an abstraction for re-using a limited number of threads to performing concurrent work.
General usage of a thread pool might look something like the following, where the :size
represents the maximum number of threads open at any given time.
pool = ThreadPool.new(size: 5) 10_000.times do pool.schedule { do_work } end pool.shutdown
The calculation would be performed 10,000 times across five separate threads.
To get a better understanding of how thread pools work, let’s implement a thread pool in test-driven fashion.
The code samples in this post are run on rubinius-3.14
to take advantage of parallel processing. As you may know, MRI’s global interpreter lock ensures only one code can execute Ruby code at any one time.
Don’t be afraid
Before we dive in, let’s acknowledge that Rubyists, and most OO programmers in general, are taught to fear multi-threaded concurrency.
The first rule of concurrency on the JRuby wiki, a Ruby implementation designed to take advantage of native operating systems threads, is this:
Don’t do it, if you can avoid it.
For the purpose of this post, I’m going to assume the author means “in production”. In the safety of your development environment, playing with concurrency in Ruby can be a tremendous learning opportunity.
A simple thread pool
So we’ll implement a simple thread pool guided by tests. Our thread pool will use the interface we described earlier while limiting the number of threads used to carry out a set of concurrent “jobs”.
pool = ThreadPool.new(size: 5) pool.schedule { do_work } pool.shutdown
Basic Usage
We’ll start with a thread pool that doesn’t do any concurrent processing. It will execute the block given to its #schedule
method directly. Though we’ll add other tests later to exercise concurrency in the implementation, this first test will assume the concurrency is already implemented.
Here’s our first test.
require 'minitest/autorun' require 'minitest/pride' require_relative './thread_pool' class TestThreadPool < Minitest::Test def test_basic_usage pool_size = 5 pool = ThreadPool.new(size: pool_size) mutex = Mutex.new iterations = pool_size * 3 results = Array.new(iterations) iterations.times do |i| pool.schedule do mutex.synchronize do results[i] = i + 1 end end end pool.shutdown assert_equal(1.upto(pool_size * 3).to_a, results) end end
Let’s break it down. To test the basic usage of a thread pool scheduler, we’ll pass in an array and augment it within the scheduled blocks. Because Array
is not thread safe, we need to use a Mutex
object to lock the pooled threads while adding items to the array.
The key snippet is here:
pool.schedule do mutex.synchronize do results[i] = i + 1 end end
The test asserts that the results match 1.upto(15)
as an array.
To make the tests pass:
class ThreadPool def initialize(size:) end def schedule(*args, &block) block.call(args) end def shutdown end end
We’ve just stubbed out the #initialize
and #shutdown
methods since additional behavior isn’t needed to get the tests to pass.
You can see the source for this changeset on Github.
Saving time
Our next test will demonstrate that we’re actually taking advantage of concurrency by (crudely) measuring the time taken to process multiple jobs.
We’ll use a small test helper method to measure the number of seconds elapsed during execution:
def time_taken now = Time.now.to_f yield Time.now.to_f - now end
Our test will schedule 5 jobs that will each sleep for 1 second. If the jobs executed serially, the total execution time would be at least 5 seconds. Running in parallel on Rubinius, we’d expect threaded-execution of 5 jobs across 5 threads to take less time.
def test_time_taken pool_size = 5 pool = ThreadPool.new(size: pool_size) elapsed = time_taken do pool_size.times do pool.schedule { sleep 1 } end pool.shutdown end assert_operator 4.5, :>, elapsed, 'Elapsed time was too long: %.1f seconds' % elapsed end
This test fails with our first pass-through implementation of ThreadPool
. We can make this test pass by wrapping each scheduled job in its own thread.
class ThreadPool def initialize(size:) @pool = [] end def schedule(*args, &block) @pool << Thread.new { block.call(args) } end def shutdown @pool.map(&:join) end end
We push each of these threads onto an array, @pool
, which we can use to join the threads during the #shutdown
method. The tests pass again.
Adding Pooling
While we’ve achieved concurrency, you may notice there’s (at least) one problem.
Our current implementation will naively create a new thread for each scheduled job. This may not be an issue for small, trivial use cases, but it can be easily abused. Thread creation does not come for free; every OS has its limit.
We’ll prove it with our next test in which we’ll schedule a large number of jobs.
def test_pool_size_limit pool_size = 5 pool = ThreadPool.new(size: pool_size) mutex = Mutex.new threads = Set.new 100_000.times do pool.schedule do mutex.synchronize do threads << Thread.current end end end pool.shutdown assert_equal(pool_size, threads.size) end
Running these tests on my mid-2014 MacBook Pro, I hit the resource limit:
TestThreadPool#test_pool_size_limit: ThreadError: can't create Thread: Resource temporarily unavailable /Users/ross/dev/rossta/enumerable/examples/thread_pool/thread_pool_test.rb:53:in `initialize' /Users/ross/dev/rossta/enumerable/examples/thread_pool/thread_pool_test.rb:53:in `new' /Users/ross/dev/rossta/enumerable/examples/thread_pool/thread_pool_test.rb:53:in `block in test_pool_size_limit' /Users/ross/dev/rossta/enumerable/examples/thread_pool/thread_pool_test.rb:52:in `times' /Users/ross/dev/rossta/enumerable/examples/thread_pool/thread_pool_test.rb:52:in `test_pool_size_limit'
This is now the whole point of our ThreadPool
: to limit the number of threads in use. To implement this behavior, instead of executing the scheduled job in a new thread, we’ll add them to a Queue
. We’ll separately create a limited number of threads whose responsibility will be to pop new “jobs” off the queue and execute them when available.
The beauty of Queue
is that it is thread-safe; multiple threads in the thread pool an access this resource without corrupting its contents.
Here’s the revised implementation:
class ThreadPool def initialize(size:) @size = size @jobs = Queue.new @pool = Array.new(size) do Thread.new do catch(:exit) do loop do job, args = @jobs.pop job.call(*args) end end end end end def schedule(*args, &block) @jobs << [block, args] end def shutdown @size.times do schedule { throw :exit } end @pool.map(&:join) end end
Let’s start with the #schedule
method. Where before we immediately creating a new thread to call the block, we instead push the block and augments onto the new @jobs
queue instance variable.
def schedule(*args, &block) @jobs << [block, args] end
This instance variable is set up in the #initialize
method where we also eagerly create the maximum number of threads that will become our worker @pool
.
def initialize(size:) @size = size @jobs = Queue.new @pool = Array.new(size) do Thread.new do catch(:exit) do loop do job, args = @jobs.pop job.call(*args) end end end end end
Each thread runs an infinite loop that repeatedly pops jobs of the queue with @jobs.pop
. The Queue#pop
method here will block when the queue is empty so the thread will happily wait for new jobs to be scheduled at this point.
Notice also to catch
block. We break out of the thread loops by pushing throw :exit
on to the job queue, once for each thread in the #shutdown
method. This means that jobs currently executing when the shutdown method is called will be able to complete before the threads can be joined.
def shutdown @size.times do schedule { throw :exit } end @pool.map(&:join) end
Now we have a simple abstraction for handling concurrent work across a limited number of threads. For more on this implementation, check out the original author’s blog post on the subject.
In the Wild
Of course, if you’re planning on using a thread pool in production code, you may be better off leveraging the hard work of others. Our implementation omits some key considerations, like providing reflection, handling timeouts, dealing with exceptions, and better thread safety. Let’s look at some alternatives in the community.
The ruby-thread project provides a few extensions to the standard library Thread
class, including Thread::Pool
. Usage of Thread::Pool
is very similar to what we came up with on the surface.
require 'thread/pool' pool = Thread.pool(4) 10.times { pool.process { sleep 2 puts 'lol' } } pool.shutdown
This implementation goes farther to ensure standard locking functions to work properly across multiple Ruby implementations. Among other things, it has support for handling timeouts, methods for introspecting pool objects, like #running?
and #terminated?
, and optimizations for dealing with unused threads. On reading the source, my impression is the implementation was heavily inspired by Puma::ThreadPool, a class used internally by the puma web server. You be the judge.
Celluloid, the most famous collection of concurrency abstractions, provides a thread pool class, most commonly accessed via a class method provided by the Celluloid
mixin.
class MyWorker include Celluloid def add_one(number) # roflscale computation goes here number + 1 end end MyWorker.pool pool.future(:add_one, 5).value
The new hotness for working with concurrency is the toolkit provided by concurrent-ruby. While Celluloid
is easy to get started with, Concurrent
is the “Swiss Army Knife”, providing a large array of abstractions and classes, including futures, promises, thread-safe collections, maybes, and so on. Concurrent
provides several different thread pool implementations for different purposes, each supporting a number of configurations, including min and max pool sizes, advanced shutdown behaviors, max queue size (along with a fallback policy when the job queue size is exceeded) to name a few.
pool = Concurrent::FixedThreadPool.new(5) # 5 threads pool.post do # some parallel work end
Consider the Thread Pool overview provided in the Concurrent
docs required reading.
And, of course, the ultimate thread pool for Rails developers is Sidekiq. Unlike the examples we’ve discussed so far, the components of the Sidekiq thread pool model are distributed: the caller, the job queue, and the threaded workers all run in separate processes, often on separate machines, in a production environment.
Credits
In preparing for this post, I read through the source of several thread pool implementations from various sources, ranging from simple examples, to internal interfaces, to public-facing libraries.
Thread::Pool
Celluloid::Group::Pool
Concurrent::RubyThreadPoolExecutor
Puma::ThreadPool
Though it’s well documented how much threads suck, that shouldn’t discourage Rubyists from trying to get some first-hand experience with working with threads, supporting classes from the standard library like Queue
, Mutex
, and ConditionVariable
and generic abstractions like ThreadPool
.
Connection Pool, the Sequel
Related, though not necessarily thread-based, is the concept of a connection pool, which limits the number of network connections to a particular service. You’ll find connection pools in activerecord, mongodb, and, as a standalone abstraction in the approrpriately-named, connection_pool.
It’s good to know about connection pools for setting a connection to Redis from your Ruby applications with the redis-rb gem. As of this writing, this client does not manage a connection pool for you, so the common gotcha is a memory-leak that originates from creating a lot of open connections to the Redis server. You can avoid this with ConnectionPool:
redis = ConnectionPool.new { Redis.new }
Much like ThreadPool
, having at least a cursory understanding of what’s happening underneath can help you avoid issues with managing resources like network connections.