Background

Elixir and OTP excel at asynchronous and long-running tasks, and likewise, they seem like a great fit for interfacing with the non-Elixir world, particularly being able to shell-out and monitor a long running external OS process.

In the Ruby world, we often reach for tools like Sidekiq or Resque to run these commands asynchronously, outside of the normal HTTP request/response cycle that we’re used to when we usually build an app or API with Rails.

Tasks like video encoding (ffmpeg), photo processing (krpano), or other random “weird” utilities are good examples (rtl-fm or multimon-ng are on my radar to play around with).

Many of these tools report progress on STDOUT or STDERR, and there’s a compelling user experience case for being able to capture this output, and display progress in a web browser in real-time. Elixir and OTP give us several great tools to approach this problem.

There are also other packages in Hex (e.g. Porcelain) that aim to make working with ports easier, but I eventually settled on dealing directly with Ports and GenServer myself, so I could understand how it was working on a lower level.

This article assumes a basic understanding of Elixir, GenServer and *nix shell scripting and processes. It also assumes you have a working Ruby installation, but $YOUR_FAVORITE_LANGUAGE can be substituted for the Ruby script below.

What’s a Port?

From the Elixir docs:

Ports provide a mechanism to start operating system processes
external to the Erlang VM and communicate with them via message passing.

That sounds exactly like what we need! Fortunately the Elixir docs are pretty good.

mix new

Let’s create a new mix project to give us some structure to play around with.

$ mix new ports_example
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/ports_example.ex
* creating test
* creating test/test_helper.exs
* creating test/ports_example_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd ports_example
    mix test

Run "mix help" for more commands.

$ cd ports_example

A long-running command

Next, we need a long-running command to monitor. Let’s say you’ve got a program that takes a while to finish, occasionally writes its progress to STDOUT, and then exits to the shell with a zero return status, or non-zero if there’s an error.

Here’s a Ruby script that does just that, but any program that operates similar to this would work.

Make a directory at the top of your new mix project called bin/ and put the following script there:

bin/long_running.rb

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#!/usr/bin/env ruby

puts "Starting up"

TOTAL = 10

TOTAL.times do |n|
  STDOUT.flush
  sleep 1
  puts "Progress: step #{n+1} of #{TOTAL}"
end

STDOUT.flush
puts "Done"

The only thing of note is that we need to flush STDOUT so the Port sees the updates line-by-line.

Port of Entry

Now we need some code to open the Port to the process and get updates!

lib/ports_example/basic_port.ex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
defmodule PortsExample.BasicPort do
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(_args \\ []) do
    port = Port.open({:spawn, @command}, [:binary, :exit_status])

    {:ok, %{latest_output: nil, exit_status: nil} }
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, state) do
    latest_output = text_line |> String.trim
    
    Logger.info "Latest output: #{latest_output}"

    {:noreply, %{state | latest_output: latest_output}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, state) do
    Logger.info "External exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}
    {:noreply, %{state | exit_status: status}}
  end

  # no-op catch-all callback for unhandled messages
  def handle_info(_msg, state), do: {:noreply, state}
end

There’s a bit of GenServer boilerplate here, but most of the meat is in init/1, where we call Port.open/2 to register our process to receive messages.

Our GenServer will now get messages as a tuple in the format of {port, {:data, text}} that we can deal with in a handle_info callback.

If you’ve used GenServer, you’re probably familiar with the handle_cast and handle_call callbacks, but possibly not handle_info. handle_info is called asynchronously when a message is sent directly to a process, e.g. by using Kernel.send/2 or after some amount of time using Process.send_after/4.

The first argument, {:spawn, @command}, will run the external program @command. There’s also a {:spawn_executable, @command} form that handles commands or arguments with spaces in it.

The second argument is the options.

In the options, :binary means you’ll get back ‘binary’ data (instead of lists of bytes).

:exit_status is a nice option to include when you want to be notified that the program exited. We deal with this message on line 27 above where we handle the :exit_status message, where the variable status is the numeric exit code from the shell. Normal exits will be 0, and non-zero exit codes indicate a failure.

Let’s run it!

OK, enough explaining. Let’s run this example!

Open up an iex prompt:

iex(27)> {:ok, pid} = PortsExample.BasicPort.start_link()
{:ok, #PID<0.269.0>}
iex(34)>
21:32:33.782 [info]  Latest output: Starting up
21:32:34.785 [info]  Latest output: Progress: step 1 of 10
21:32:35.787 [info]  Latest output: Progress: step 2 of 10
21:32:36.790 [info]  Latest output: Progress: step 3 of 10
21:32:37.794 [info]  Latest output: Progress: step 4 of 10
21:32:38.798 [info]  Latest output: Progress: step 5 of 10
21:32:39.801 [info]  Latest output: Progress: step 6 of 10
21:32:40.805 [info]  Latest output: Progress: step 7 of 10
21:32:41.808 [info]  Latest output: Progress: step 8 of 10
21:32:42.811 [info]  Latest output: Progress: step 9 of 10
21:32:43.812 [info]  Latest output: Progress: step 10 of 10
21:32:43.813 [info]  Latest output: Done
21:32:43.814 [info]  external exit: :exit_status: 0

Cool! Not only were we notified of each bit of text coming in from the external process, we were also notified when the process exited, as well as the exit code from the shell (in this case, zero).

We can also inspect the GenServer’s state by using :sys.get_state/1:

iex> :sys.get_state(pid)
%{exit_status: 0, latest_output: "Done"}

Monitoring the port

We can also be notified when the command exits in other ways by using Port.monitor/1. Calling it will set up your GenServer to get a handle_info callback in the form of {:DOWN, _ref, :port, port, :normal}, state:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
defmodule PortsExample.MonitoredPort do
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(_args \\ []) do
    port = Port.open({:spawn, @command}, [:binary, :exit_status])
    Port.monitor(port)

    {:ok, %{port: port, latest_output: nil, exit_status: nil} }
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, %{port: port} = state) do
    Logger.info "Data: #{inspect text_line}"
    {:noreply, %{state | latest_output: String.trim(text_line)}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
    Logger.info "Port exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}

    {:noreply, new_state}
  end

  def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
    Logger.info "Handled :DOWN message from port: #{inspect port}"
    {:noreply, state}
  end

  def handle_info(msg, state) do
    Logger.info "Unhandled message: #{inspect msg}"
    {:noreply, state}
  end
end

Now running it in IEX, we see the addition of handling the :DOWN message at the end of our output:

17:30:43.203 [info]  Data: "Progress: step 9 of 10\n"
17:30:43.304 [info]  Data: "Progress: step 10 of 10\n"
17:30:43.304 [info]  Data: "Done\n"
17:30:43.306 [info]  Port exit: :exit_status: 0
17:30:43.306 [info]  Handled :DOWN message from port: #Port<0.53>

You can use Port.monitor when you want additional information about when the command you’re running exits. I’m not entirely sure what the benefit is of using it over using the :exit_status message, since you get a bit more information about the command’s shell exit code with the :exit_status method.

Trapping process exits

We can also use Process.flag(:trap_exit, true) to get some other information about the lifecycle of our GenServer.

What happens when something forces your (GenServer) process to quit or crash? We can use Process.flag(:trap_exit, true) to be notified, and attempt to clean up the running command.

Let’s also keep track of the port in our GenServer’s state so we can get some additional information about it:

lib/ports_example/trap_process_crash.ex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
defmodule PortsExample.TrapProcessCrash do
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(args \\ []) do
    Process.flag(:trap_exit, true)

    port = Port.open({:spawn, @command}, [:binary, :exit_status])
    Port.monitor(port)

    {:ok, %{port: port, latest_output: nil, exit_status: nil} }
  end

  def terminate(reason, %{port: port} = state) do
    Logger.info "** TERMINATE: #{inspect reason}. This is the last chance to clean up after this process."
    Logger.info "Final state: #{inspect state}"

    port_info = Port.info(port)
    os_pid = port_info[:os_pid]

    Logger.warn "Orphaned OS process: #{os_pid}"

    :normal
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, %{port: port} = state) do
    Logger.info "Data: #{inspect text_line}"
    {:noreply, %{state | latest_output: String.trim(text_line)}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
    Logger.info "Port exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}

    {:noreply, new_state}
  end

  def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
    Logger.info "Handled :DOWN message from port: #{inspect port}"
    {:noreply, state}
  end

  def handle_info({:EXIT, port, :normal}, state) do
    Logger.info "handle_info: EXIT"
    {:noreply, state}
  end

  def handle_info(msg, state) do
    Logger.info "Unhandled message: #{inspect msg}"
    {:noreply, state}
  end
end

The main difference here are the additions of Process.flag(:trap_exit, true) at the top of init/1, and the corresponding implementation of terminate/2 which our GenServer process is notified of when the process goes down, e.g. calling Process.exit(pid, :normal). Now, let’s start TrapProcessCrash, and crash it while it’s running by using this:

{:ok, pid} = PortsExample.TrapProcessCrash.start_link()

17:56:16.211 [info]  Data: "Starting up\n"
17:56:17.214 [info]  Data: "Progress: step 1 of 10\n"
17:56:18.214 [info]  Data: "Progress: step 2 of 10\n"
17:56:19.215 [info]  Data: "Progress: step 3 of 10\n"

iex(15)> Process.exit(pid, :normal)

17:50:11.138 [info]  ** TERMINATE: :normal. This is the last chance to clean up after this process.
17:50:11.138 [info]  Final state: %{exit_status: nil, latest_output: "Progress: step 1 of 10", port: #Port<0.77>}
17:50:11.138 [warn]  Orphaned OS process: 21094

Clearly, an orphaned process is a Bad Thing™, so scope out the “Zombie Processes” Caveat below!

Other thoughts + that’s a wrap!

It might be advantageous to use Port.info/1 right when you open the port and store the operating system process ID in the state. But as long as you have the reference to the port somewhere, you’ll have access to it.

And that’s the basics of dealing with external processes in Elixir using the Port module. There are a few other useful functions in the Port module, most notably Port.command/3 which allows you to send data back to the external command. This could be useful if you have a program that requires some sort of interactivity or user input.

Caveats

Argument sanitizing

Do not pass user input directly through to the command in Port.open! If your call looks something like this: Port.open({:spawn, "/path/to/bin #{user_input}"}), this is a hint that you might have a potential security hole.

Zombie processes

The GenServer is keeping an eye on the ruby script, and it can kill it off, but what happens if the entire VM goes away? the Elixir docs present a simple, wrapper script-based solution for ensuring processes go away after they’re finished.

Code repo

The code for the examples in this article is located here: tonyc/elixir_ports_example

Open Tabs

Thanks

Special thanks to Graham McIntire, Steve Hebert and Jared Smith for reviewing this article.