Forking and IPC in Ruby, Part II

Share this article

Fork of three roads

In the first article, we examined why the fork() system call is useful and where it fits into the grand scheme of things. We saw that by passing a block to Kernel#fork or Process#fork it is possible to execute arbitrary code concurrently (or in parallel if there are multiple processors). In addition, we saw that although forking is relatively expensive, it can compete with threading if the Ruby implementation doesn’t squander Copy-on-Write optimization.

Unfortunately, if a method is called in a fork, any data that it produces will not be available to the parent process, due to process isolation. What we need is a conduit through which we can pass data between processes. Here, in Part II, we’re going to cover some interprocess communication mechanisms as well as more ways that you can utilize fork.

The code in this article is available at github.

Pipes

Pipes enable data to flow in one direction between a pair of file descriptors. Since forks inherit open file descriptors, pipes can be used to communicate data between parent and child processes. Creating a pipe in Ruby is easy with IO#pipe.

>> reader, writer = IO.pipe
=> [#<IO:fd 5>, #<IO:fd 6>]

Here, writer will only write, and reader will only read. Sounds simple, but unfortunately there are some nuances. If you want to use a pipe more than once, IO#puts complemented by IO#gets is the simplest way,

>> writer.puts("hello world")
>> reader.gets
=> "hello world\n"

>> writer.puts("hello world, again")
>> reader.gets
=> "hello world, again\n"

Pipes communicate data using byte streams, so they need delimiters to know when to stop reading data. IO#gets reads until it receives a “\n”. Unlike IO#puts, IO#write does not automatically append “\n” to data, so a IO#gets followed by a IO#write will block indefinitely.

>>writer.write("this string is terminated\n")
>>reader.gets
=> "this string is terminated\n"

>> writer.write("this string is not terminated")
>> reader.gets
*waits indefinitely for \n*

Unlike IO#gets, IO#read will wait indefinitely for EOF. IO objects signal EOF when closed, so you will only use IO#read for one-time writes.

>>reader,writer = IO.pipe
>>writer.write("hello world")
>>writer.close
>>reader.read
=> "hello world"
>>reader.read
=> ""

Ok, so how would we connect a pipe between a parent process and one of its children? Thanks to the fact that variables and file descriptors will be shared, we only need to create a pipe once. However, since the fork will copy both ends of the pipe (reader and writer), we need to pick two extra ends and close them. Here is how you would send data from a child to parent.

child_parent_pipe.rb
# creates a fork and pipes a string from child to parent

reader,writer = IO.pipe

fork do
  reader.close
  writer.puts "sent from child process"
end

writer.close
from_child = reader.gets
puts from_child

We can also make it work the other way. Just make sure to close the writer on the end that will read and the reader on the end that will write.

parent_child_pipe.rb
# creates a fork and pipes a string from parent to child

reader,writer = IO.pipe

fork do
  writer.close
  from_parent = reader.gets
  puts from_parent
end

reader.close
writer.write "sent from parent process"

UNIX-Sockets

You can think of UNIX domain sockets as pipes with two big advantages:

  • UNIX sockets are bidirectional whereas pipes are unidirectional
  • Pipes can only use byte steams, but UNIX sockets can also use datagrams

Unlike regular sockets, they can only communicate data between two points on the same machine, using the filesystem as their address space. As a result, they can be very lightweight, making them a good fit for interprocess communication.

unix_sockets.rb
# creates a pair of UNIX sockets that send and receive a string

require 'socket'
parent_socket, child_socket = UNIXSocket.pair

fork do
  parent_socket.close
  child_socket.send("sent from child (#{$$})", 0)
  from_parent = child_socket.recv(100)
  puts from_parent
end

child_socket.close
parent_socket.send("sent from parent (#{$$})", 0)
from_child = parent_socket.recv(100)
puts from_child

Distributed Ruby

Both pipes and UNIX sockets have a couple of downsides:

  1. They are low-level, byte-transmission mechanisms. For complex behavior you will need to implement existing protocols or define your own.
  2. They cannot communicate across the machine barrier, limiting their usability in massively-scalable scenarios.

Distributed Ruby lets you create and consume what might be called “distributed object services.” Theses services let you execute code remotely by sending messages to distributed objects.

We execute code on remote machines all the time, but we do it through quite a bit of indirection. For example, let’s say you go to a web address like http://searchforallthestuffs.com/search?q=ponies. Your request hits a router which sees the route you specified and passes the arguments to the corresponding controller, which then executes the code associated with that route (generate an html view, JSON, XML, etc).

If your goal is to execute code remotely, this…kinda sucks. Every time you add a new feature, you need a route in place and perhaps a new controller. That’s a lot of overhead just to add a method to a class.

Distributed objects let you execute code remotely, but with an object receiving the message rather than an address.

distributed.rb
# Creates several worker processes and concurrently waits for the fastest one

require 'drb'

NUM_WORKERS = 4

class Worker

  def calculate
    time_to_work = rand(1..7)
    sleep time_to_work
    return time_to_work
  end

  def stop
    DRb.stop_service
  end
end

# Start object services
NUM_WORKERS.times do |i|
  DRb.start_service("druby://:700#{i}", Worker.new)
  puts "Worker running at #{DRb.uri}"
end

# Create a local end-point for each service
workers = NUM_WORKERS.times.map { |i| DRbObject.new nil, "druby://:700#{i}" }

# Concurrently wait for the fastest calculation
thread_pool = []
NUM_WORKERS.times do |i|
  thread_pool << Thread.new do
    answer = workers[i].calculate
    puts "Worker #{i} finished in #{answer} seconds"
  end
end

# Wait for every thread to get its answer
thread_pool.each(&:join)

# Shut down each worker
workers.each { |w| w.stop }

Moving Objects between Processes

Since low-level communication constructs like pipes and sockets transfer bytes, not objects, you will need to encode your objects into a byte format — i.e. serialize them — in order to move them across the process barrier.

Fortunately, Ruby ships with Marshal which can serialize most Ruby objects.

From Ruby-Doc.org: “Some objects cannot be dumped: if the objects to be dumped include bindings, procedure or method objects, instances of class IO, or singleton objects, a TypeError will be raised.”

serialization.rb
# Ruby objects can be serialized by Marshal to move across pipes
# or sockets

Tire = Struct.new(:radius, :pressure)

reader, writer = IO.pipe

fork do
  reader.close
  tire = Tire.new(7, 28)
  tire_data = Marshal.dump(tire)
  writer.write tire_data
end

writer.close
tire_data = reader.gets
tire = Marshal.load(tire_data)
puts tire.inspect

Creating a Module for Asynchronous Method Calls

If you can move objects between process, then you can execute a method in another process and get the result back in the original. This effectively enables asynchronous method execution.

Here, I have a module called Forkable that gives a class the ability to execute methods in parallel. The only thing that is different from before is that the pipe is read from in a new thread, and what comes out of the pipe is yielded to the block.

forkable.rb
# A module that lets you fork a method and get its result in a block
# Notes: 
#   1. Forked methods can't take blocks in this implementation
#   2. Call back threads will be destroyed if the main process exits
#   3. Not production ready. For educational purposes only.
####################################################################

# If included in a class #fork_method will run a method in another process
# and yield the result to a block
module Forkable
  def fork_method(method, *args)
    reader, writer = IO.pipe
    fork do
      reader.close
      result = self.send(method, *args)
      child_data = Marshal.dump(result)
      writer.puts(child_data)
    end

    writer.close
    Thread.new do
      data_from_child = reader.gets
      yield Marshal.load(data_from_child) if block_given?
    end
  end
end

# An object that takes a random amount of time to instantiate
class ExpensiveObject

  attr_reader :expense

  def initialize(max_expense)
    @expense = rand(1..max_expense)
    sleep @expense
  end
end

# A worker that forks, a...forker
class Forker
  include Forkable

  def calculate(max_expense)
    return ExpensiveObject.new(max_expense)
  end
end

# Create 3 ExpensiveObjects and print how long they took to create
f = Forker.new
3.times do 
  f.fork_method(:calculate, 7) do |result|
    puts "result: #{result.inspect}"
  end
end

puts "waiting on results..."

Process.waitall
puts "main process finished"

Cod

Cod is a library that aims to make IPC easier in Ruby. In the previous section, I demonstrated how to serialize objects for transfer across byte-transmission mechanisms like pipes or sockets. Cod takes care of this for us. It uses higher-level IPC mechanisms it calls channels. Not only do channels not need to be closed on one end in each process, but they can also transmit Ruby objects. A channel is created in Cod (however oddly) with Cod#pipe.

cod.rb
# The cod gem simplifies IPC. Ruby objects can be sent across channels.
# Installation: 
#   $ gem install cod

require 'cod'

Tire = Struct.new(:radius, :pressure)

channel = Cod.pipe

3.times do
  fork do
    radius = rand(8..14)
    pressure = rand(24..33)
    channel.put Tire.new(radius, pressure)
  end
end

tires = 3.times.map { channel.get }
puts tires.inspect

The cod gem does more than I can cover here (for example, it has beanstalkd integration). The website has a lot of examples, so definitely check it out.

Pre-forking Servers

Let’s look at a common example of forking in practice: servers. Some servers that use forking to achieve parallelism include:

  • Apache (Prefork MultiProcessing Module)
  • Unicorn
  • Rainbows! (based on Unicorn)

Specifically, forking servers generally use pre-forking. The concept works like this: forking a process might be cheaper than copying one, but it’s not free. Since running a web server generally consists of opening and closing lots of short-lived connections, it might not be a good idea to wait to create a new child process until a new connection is accepted. Instead, we can go ahead and fork a few times and let the forks wait for connections. You will often see this described as running a “pool of processes.”

So what might that look like?

preforking.rb
# Starts an echo server that can service 3 clients in parallel

require 'socket'

server = TCPServer.new 'localhost', 3000

trap("EXIT") { socket.close }
trap("INT") { exit }

3.times do
  fork do
    sock = server.accept
    sock.puts "You are connected to process #{$$}:"
    while recv = sock.gets
      sock.puts("ECHO:> #{recv}")
    end
  end
end

Process.waitall

If you run this code, you should be able to connect on 3 separate terminals.

telnet localhost 3000

The Parallel Gem

The parallel gem provides the ability to achieve parallelism in CRuby (MRI) without getting into the nitty-gritty of forks and IPC. Here is a quick way to spool up all of your CPU cores with parallel.

parallel.rb
# Parallel iterates across collections with processes or threads
# Installation: 
#   $ gem install parallel

require 'parallel'

def calculate(magnitude)
  x = 0
  cycles = 10 ** magnitude
  cycles.times do
    x += 1.000001
  end
  return x
end

results = Parallel.map([6, 6, 6, 6, 7, 7, 7, 7]) do |mag|
  calculate(mag)
end

puts results

SitePoint already has a great article about the parallel gem that explores its inner workings (at least as of 2011).

Programmers often avoid looking at source code out of fear of it turning into a massive time sink. The parallel gem, however, is a fairly small, one file library. There’s a great opportunity to learn and contribute.

Conclusion

Even as other interpreters (JRuby, Rubinius) operate freely without a Global Interpreter Lock, a process-centric approach to concurrency is still useful. The more vertical an approach to scaling is, the more limited it will be. Running multiple threads on multiple cores might be more horizontal than increasing clock speed, but it is still more vertical than running multiple machines. Running 100 single core instances is more granular than running 25 quad core instances. Thinking in terms of processes sending messages enables you to embrace that kind of granularity. It won’t be easy, but once it works with a cluster of 10 instances, why not 100? Or 1000? Why not have 500 on one provider and 500 on another? They’re just processes sending messages.

Spending a bunch of money up front on resources you may or may not use represents the old way of thinking. Why buy more than exactly what you need?

Frequently Asked Questions (FAQs) about Forking and IPC in Ruby

What is the main difference between forking and threading in Ruby?

Forking and threading are both methods of achieving concurrency in Ruby, but they work in fundamentally different ways. Forking creates a new process that is a copy of the current one, with its own memory space. This means that changes made in the child process do not affect the parent process. Threading, on the other hand, creates multiple threads of execution within the same process, sharing the same memory space. This means that changes made in one thread can affect other threads. While forking can provide more isolation and can take advantage of multiple processors, it is also more resource-intensive than threading.

How can I communicate between parent and child processes in Ruby?

Interprocess communication (IPC) in Ruby can be achieved through several methods, including pipes, sockets, and shared memory. Pipes are a simple and common method of IPC, allowing data to be written to one end and read from the other. Sockets provide a more flexible method of IPC, allowing communication between processes on different machines. Shared memory allows for direct access to a common memory area, but requires careful synchronization to avoid conflicts.

What are the potential pitfalls of using forking in Ruby?

While forking can provide a powerful method of achieving concurrency in Ruby, it also comes with its own set of challenges. One of the main pitfalls is the potential for ‘zombie’ processes. These are child processes that have finished execution but have not been properly reaped by the parent process. If left unchecked, these can consume system resources and lead to performance issues. Another potential pitfall is the overhead associated with creating a new process, which can be significant if a large number of processes are being created.

How can I avoid ‘zombie’ processes when using forking in Ruby?

Zombie’ processes can be avoided by ensuring that the parent process properly reaps its child processes once they have finished execution. This can be done using the Process.wait method, which waits for a child process to exit and then reaps it. If you have multiple child processes, you can use Process.waitall to wait for and reap all child processes.

How can I use sockets for IPC in Ruby?

Sockets provide a flexible method of IPC in Ruby, allowing for communication between processes on different machines. To use sockets, you first need to create a socket using the Socket.new method. You can then use the connect method to connect to a remote socket, and the bind and listen methods to set up a local socket to accept connections. Data can be sent and received using the send and recv methods.

How can I use shared memory for IPC in Ruby?

Shared memory is a method of IPC that allows for direct access to a common memory area. In Ruby, shared memory can be achieved using the mmap library, which provides a Mmap class that can be used to create a shared memory object. This object can then be read from and written to like a normal array, but the data is stored in a shared memory area that can be accessed by other processes.

How can I handle errors when using forking in Ruby?

Error handling when using forking in Ruby can be achieved using the begin and rescue keywords. You can wrap your forking code in a begin block, and then use a rescue block to catch and handle any errors that occur. This allows you to gracefully handle errors and prevent them from crashing your program.

How can I use pipes for IPC in Ruby?

Pipes are a simple and common method of IPC in Ruby. To use pipes, you can use the IO.pipe method to create a pair of pipe endpoints. Data can be written to one end of the pipe using the write method, and read from the other end using the read method.

How can I use threads for concurrency in Ruby?

Threads provide a lightweight method of achieving concurrency in Ruby. To create a new thread, you can use the Thread.new method, passing in a block of code to be executed in the thread. You can then use the join method to wait for the thread to finish execution. It’s important to note that due to Ruby’s Global Interpreter Lock (GIL), threads in Ruby do not truly run in parallel, but rather interleave their execution.

How can I synchronize access to shared resources in Ruby?

Synchronization in Ruby can be achieved using several methods, including mutexes, condition variables, and queues. Mutexes provide a simple method of ensuring that only one thread can access a shared resource at a time. Condition variables can be used to signal between threads, allowing one thread to wait for a condition to be met by another thread. Queues provide a thread-safe way of passing data between threads.

Robert QuallsRobert Qualls
View Author

Robert is a voracious reader, Ruby aficionado, and other big words. He is currently looking for interesting projects to work on and can be found at his website.

Share this article
Read Next
Get the freshest news and resources for developers, designers and digital creators in your inbox each week