Note: this post was migrated from my old Tumblr-backed blog
The more I’ve been working with large-scale systems and writing code that I aim to be as fault tolerant as possible, the more I’ve become enamored with Erlang and Elixir’s pattern matching style for handling data flow. I’ve had several occasions where I’ve needed scalable solutions that both provided real-time responses to the client and could scale. These are two things for which Erlang, and by extension, Elixir is great.
In the past I only created some terminal programs in Elixir and I wanted to experiment with a web application, so I did some investigation and encountered Dynamo – Elixir’s analog to Ruby’s Sinatra. Following the documentation on GitHub, getting up and running is very straight forward and it’s got support for OTP, which means that I can build individually scalable modules that can be supervised however I want.
I thought about making a chat app, but everyone does that and I wanted to do something relatively simple that I could crank out in a night or two of light coding, so I settled on a basic login tracker. Essentially, it would store a list of users (without duplicates) allowing them to “sign in” and “sign out” and then an arbitrary number of subscribers could hook up to an EventSource and receive login/disconnect events.
What I wanted to accomplish
The goals of my exercise were as follows:
- use an OTP app to persist data while the application is running
- enable EventSource so I could stream add/remove events to a client in real-time by connecting to a URI
- have clients add/remove “users” to a persistent datastore via an http API.
- have the OTP app send events out to notify clients of changes to the list of users.
I’ve never written an OTP app, but I did read the OTP chapter of both the O’Reilly Erlang book and PragProg’s Elixir book, so I had some basic ideas of the concepts. I also never worked with EventSource before and had no idea about the technical details of its implementation, but I did have a good idea of what it’s used for and how to use it.
After looking online a bit, I found 2 excellent tutorials that really got me started:
- http://miguelcamba.com/blog/2013/04/29/tutorial-build-a-web-app-using-elixir-and-dynamo-with-streaming-and-concurrency/
- http://benjamintanweihao.github.io/blog/2013/08/14/elixir-for-the-lazy-impatient-and-busy-part-5-streams-streaming-dynamo/
I was having a lot of trouble putting everything together until I came across these blog posts, but after seeing their approaches, I was able to get something working and it all began to click.
Creating the OTP UserList
After creating my base Dynamo app, I needed to create an OTP app to persist my data for as long as the app is running. The main Dynamo process would communicate with this app by sending messages via :gen_server.cast/2
and :gen_server.call/2
.
This app just needed to persist a list of strings representing connected users, return the list, add to the list (without adding if there’s a duplicate) and removing users from the list.
To do that, I created my skeleton OTP Server code at lib/elixir_webtest/user_store.ex
:
defmodule ElixirWebtest.UserStore do
use GenServer.Behaviour
def start_link( users ) do
:gen_server.start_link({:local, :user_store}, __MODULE__, users, [])
end
def init( users ) do
{:ok, users}
end
end
That code initializes the UserStore with the supplied default state (an empty List) and defines its name so it can be called as :user_store
.
I then added 2 functions for working with the state:
defp add_user( users, new_user ) do
if Enum.any?( users, fn(x) -> x == new_user end ) do
users
else
[new_user|users]
end
end
defp del_user( users, user ) do
if Enum.any?( users, fn(x) -> x == user end ) do
List.delete( users, user )
else
users
end
end
The above functions will be called by the OTP cast/call handlers to add or remove users to the list. I didn’t want the ability for any user to be included in the list twice, so I first check the list in both functions using the Enum.any?/2
function.
In del_user/2
, there technically is no reason to check for the existence of user
, since List.delete/2
will just return the original list if user
doesn’t exist, but I’m leaving room for broadcasting changes to subscribers and I only want to broadcast the change if a user is actually deleted from the list.
Adding the OTP message handlers
The next step is to add the actual OTP handlers to this module. These are the functions that receive messages from clients and respond to them. We need to create 3 handlers; one handle_call/2
, which will respond with the list of users and 2 handle_cast/2
definitions which will be used for making changes to the list. Since clients making changes don’t need a response, we are using handle_cast
rather than handle_call
which will reply:
def handle_call( :list_users, _from, users ) do
{ :reply, users, users }
end
def handle_cast( { :add_user, new_user }, users ) do
{ :noreply, add_user( users, new_user ) }
end
def handle_cast( { :del_user, user }, users ) do
{ :noreply, del_user( users, user ) }
end
In OTP, each handle_*
function returns a tuple containing a response type (one of :reply
or :noreply
) and then some values. handle_cast
just returns the new state of the app (because casts can change the data). handle_call
returns a 3-value tuple, where the second value is the response and the third value is the updated state of the app.
That is it for the UserStore. Now we need to create an OTP app to store who is subscribing to changes in the UserStore.
Creating the OTP subscriber list
Just like with the UserStore, the SubscriberStore begins its life as an OTP app skeleton. This module will store a list of pids that we’ll message to notify of any changes in the UserStore.
Create a file containing the following at lib/elixir_webtest/subscriber_store.ex
:
defmodule ElixirWebtest.SubscriberStore do
use GenServer.Behaviour
def start_link( subscribers ) do
:gen_server.start_link({:local, :subscriber_store}, __MODULE__, subscribers, [])
end
def init( subscribers ) do
{ :ok, subscribers }
end
end
This OTP module is slightly different from the UserStore. Although they both store a list of something, the usage patterns enable us to make this module a little less complex. The UserStore is modified based on client requests where the pids that are being added to this module will be managed by Dynamo itself. Each request that comes in will have a separate pid, so there’s no risk of duplication (provided our app is bug-free).
So with that said, we create 2 functions to manage the state:
defp remove_subscriber( subscribers, subscriber ) do
List.delete subscribers, subscriber
end
defp add_subscriber( subscribers, new_subscriber ) do
[new_subscriber|subscribers]
end
Everything should be very straight forward, there; now we just need to create the handlers:
def handle_cast( { :add, new_subscriber }, subscribers ) do
{ :noreply, add_subscriber(subscribers, new_subscriber) }
end
def handle_cast( { :del, subscriber }, subscribers ) do
{ :noreply, remove_subscriber( subscribers, subscriber ) }
end
There’s one more function we need to add, and that’s the broadcast
handler. This will take a supplied message and spit it out to all pids in our subscribers
list:
def handle_cast( { :broadcast, event }, subscribers ) do
Enum.each subscribers, fn(sub) ->
send( sub, event )
end
{ :noreply, subscribers }
end
All we’re doing is iterating over subscribers
and using send/2
to send the supplied event
to that pid. In this case, event
will be a tuple containing an action (either :add
or :del
) and a user. This will be JSONified and sent to the client that’s connected to our EventSource.
Broadcasting changes
We now want to add our call to :broadcast
to our UserStore. So Update the add_user
and remove_user
functions in UserStore (lib/elixir_webtest/user_store.ex
) to the following:
defp add_user( users, new_user ) do
if Enum.any?( users, fn(x) -> x == new_user end ) do
users
else
:gen_server.cast :subscriber_store, { :broadcast, { :add, new_user } }
[new_user|users]
end
end
defp del_user( users, user ) do
if Enum.any?( users, fn(x) -> x == user end ) do
:gen_server.cast :subscriber_store, { :broadcast, { :del, user } }
List.delete( users, user )
else
users
end
end
The line containing :gen_server.cast :subscriber_store, { :broadcast, { :del, user } }
is actually sending the :broadcast
message to our SubscriberStore, which gets picked up by our handler and { :del, user }
is sent to each pid in subscribers
.
Now we have 2 OTP modules that will persist some data and we can interact with from our Dynamo.
Getting it up and running
The last step before we can work on our routes is to configure Dynamo to boot our OTP modules when the app comes up. This is an extra layer of customization, so we’ll build a Supervisor and then tell Dynamo to boot this first, which will start up UserStore and SubscriberStore and also get the Dynamo up, too.
First, create a file in lib/elixir_webtest/supervisor.ex
:
defmodule ElixirWebtest.Supervisor do
use Supervisor.Behaviour
def start_link( user_list ) do
:supervisor.start_link(__MODULE__, user_list )
end
def init( user_list ) do
children = [
worker(ElixirWebtest.SubscriberStore, [[]]),
worker(ElixirWebtest.UserStore, [ user_list ]),
supervisor(ElixirWebtest.Dynamo, [])
]
supervise children, strategy: :one_for_one
end
end
This is using the Supervisor.Behaviour
module which contains helpers for booting our OTP modules as well as some initialization code. The start_link/1
function is how we start up the Supervisor, which takes an argument that we’ll pass to :supervisor.start_link/2
. After some magic, it triggers our init/1
function, passing it the initialization value. The great thing about this is that we can have a default user_list (or default anything for that matter if we edit the code a bit).
The init/1
function is where we declare our 2 OTP workers: SubscriberStore and UserStore, and initialize them with an empty list and user_list
, respectively as well as declare our Dynamo supervisor. We then call supervise/2
to start the whole thing up.
The last bit to change is in lib/elixir_webtest.ex
where we start the app with our custom Supervisor rather than the built-in Dynamo one. We cahnge the start/2
function to the following:
def start(_type, _args) do
ElixirWebtest.Supervisor.start_link([])
end
That should be it to get this working. The application should boot fine, albeit, it’ll be kinda boring. We need to add our routes!
Interacting with the UserList
The first routes we will create allow us to add and remove users from the UserList via a simple HTTP GET request. I chose GET for this because it made it easier to test in the browser, but this should probably be a POST in the future:
get "/api/login/:name" do
:gen_server.cast( :user_store, { :add_user, conn.params[:name] } )
redirect conn, to: "/users"
end
get "/api/logout/:name" do
:gen_server.cast( :user_store, { :del_user, conn.params[:name] } )
redirect conn, to: "/users"
end
When someone does a GET to /api/login/spike
, Dynamo will hit our :user_store
OTP module, pass it { :add_user, spike }
and then do a redirect to /users
. Assuming the app was just started, our UserStore should contain a list with one item: [ "spike" ]
.
Likewise, if someone does a GET to /api/logout/spike
, Dynamo will do the same, only signal to remove “spike” and we’ll be left with an empty list.
This is kinda boring though. Let’s make things more fun. Time to add streaming.
Streaming data to subscribers
What we’re going to do is enable someone to do a GET to /user-stream
and we’ll send them chunked data for an EventSource consumer. We’ll add the pid of the connection to SubscriberStore and listen for messages and send JSON over to the clients advertising this fact.
The way this will happen is that we’ll define a function event_handler/1
which will accept our conn
Dynamo connection object and wait for a message, handle the message, then recursively call itself.
We’ll be using await/4
which is defined inside Dynamo. await/4
lets the application sleep for a little bit until a message is received or a timeout interval expires. This allows you to control how long to wait for messages and act accordingly.
There are 2 callbacks that are passed to the function, typically called on_wake_up/2
and on_timeout/1
. The on_wake_up/2
function is passed the received message and the Dynamo connection object. We’ll call our on_wake_up/2
function handle_event/2
and define it multiple times to pattern match the response.
For our on_timeout/1
callback, we’ll just reply with a tuple containing :timeout
so we can easily ignore it. The idea is that we’ll only wait 5 seconds for a message, ignore the fact we timed out and recursively call the function again to wait 5 seconds for a message again.
Then, we’ll pattern match the result and do something with it, and recursively call event_handler/1
again.
The route and function will look like the following:
get "/user-stream" do
conn = conn.resp_content_type("text/event-stream")
conn = conn.send_chunked(200)
# add that handler to the subscribers
:gen_server.cast( :subscriber_store, { :add, self } )
event_handler conn
end
defp event_handler( conn ) do
# wait for up to 5 seconds for a message
result = await( conn, 5000, &handle_event(&1, &2), &on_time_out(&1) )
case result do
{ :timeout } ->
# this is returned from the on_time_out/1 function below
# ignore timeouts for now and keep recursing.
event_handler( conn )
{ :ok, _ } ->
# normal operation; conn.chunk returns { :ok, _something }
event_handler( conn )
{ :error, :closed } ->
# my event stream connection closed
# so delete self from the subscriberstore and terminate
:gen_server.cast( :subscriber_store, { :del, self } )
conn
_ ->
# anything else, just ignore it and recurse
event_handler( conn )
end
end
defp handle_event( { :add, user }, conn ) do
send_chunk conn, [ action: "add", user: user ]
end
defp handle_event( { :del, user }, conn ) do
send_chunk conn, [ action: "del", user: user ]
end
defp handle_event( msg, _conn ) do
msg
end
defp on_time_out( _a ) do
{ :timeout }
end
That’s a lot to take in. I tried to keep comments in there to explain what’s going on, but it should all be pretty straight forward. The return value of the handle_event/2
functions will be returned by await/4
, and that will be pattern matched.
In the case that conn.chunk/1
is sending data to a disconncted client, it returns a tuple containing { :error, :closed }
. In this case, we remove that subscriber from SubscriberStore so it won’t be broadcast to anymore and stop recursing. Under every other circumstance, we continue recursing and sending updates.
There is a function above that we haven’t defined yet, and that’s send_chunk/2
. This is a convenience function which I’ll show you in a second. It takes care of sending the EventSource events and encoding JSON. For that we’ll use the awesome elixir-json
library (https://github.com/cblage/elixir-json).
First, let’s add the send_chunk/2
function to our file:
defp send_chunk( conn, data ) do
result = JSON.encode(data)
case result do
{ :ok, json } ->
conn.chunk "data: #{ json }\n\n"
_ ->
conn
end
end
What that’s doing is converting the given data
to JSON and assuming it converted correctly, kick it over the wire. According to the EventSource spec, the chunk needs to contain data:
followed by the data, followed by 2 newlines. When prototyping this app, I kept forgetting the newlines and wondered why the app wasn’t sending anything, so I created this function to idiot-proof it.
The last step to get this working is to update the deps in mix.exs
to use the elixir-json
library. Make your deps
function match the following:
defp deps do
[ { :cowboy, github: "extend/cowboy" },
{ :dynamo, "~> 0.1.0-dev", github: "elixir-lang/dynamo" },
{ :json, github: "cblage/elixir-json", tag: "v0.2.7" }
]
end
Now, you should be able to mix deps.get
and then mix server
and be good to go.
To test this out, open up one terminal window and execute:
curl localhost:4000/user-stream
This will spit out events in real-time to your terminal.
In another window, run:
curl localhost:4000/api/login/yay
You should see data: {"action":"add","user":"yay"}
appear in the curl
output.
Next, run:
curl localhost:4000/api/logout/yay
And you should instantly see data: {"action":"del","user":"yay"}
appear in the curl
output.
Cool, huh?
Source
I’ve got the source to the demo app on my GitHub at the following location: https://github.com/spikegrobstein/elixir_webtest
This source also contains some HTML frontend stuff with JS goodness for realtime DOM updates and additional comments and documentation.
If anyone follows along and feels that I missed out on anything or anything wasn’t clear, let me know and I’ll make any necessary corrections.