Caffeinated Bitstream

Bits, bytes, and words.

Rust

Implementing SCTP to support WebRTC data channels in pure Rust

I recently talked at the Denver Rust Meetup about my side project to implement WebRTC data channels in Rust. I'll probably blog about this project in more detail in the future, but in the meantime here are the slides and a short demo video.

I also made the following informal "studio" video of the talk. The live version was probably better, but hopefully this is good enough to get the points across.

Tokio internals: Understanding Rust's asynchronous I/O framework from the bottom up

Tokio is a Rust framework for developing applications which perform asynchronous I/O — an event-driven approach that can often achieve better scalability, performance, and resource usage than conventional synchronous I/O. Unfortunately, Tokio is notoriously difficult to learn due to its sophisticated abstractions. Even after reading the tutorials, I didn't feel that I had internalized the abstractions sufficiently to be able to reason about what was actually happening.

My prior experience with asynchronous I/O programming may have even hindered my Tokio education. I'm accustomed to using the operating system's selection facility (e.g. Linux epoll) as a starting point, and then moving on to dispatch, state machines, and so forth. Starting with the Tokio abstractions with no clear insight into where and how the underlying epoll_wait() happens, I found it difficult to connect all the dots. Tokio and its future-driven approach felt like something of a black box.

Instead of continuing on a top-down approach to learning Tokio, I decided to instead take a bottom-up approach by studying the source code to understand exactly how the current concrete implementation drives the progression from epoll events to I/O consumption within a Future::poll(). I won't go into great detail about the high-level usage of Tokio and futures, as that is better covered in the existing tutorials. I'm also not going to discuss the general problem of asynchronous I/O beyond a short summary, since entire books could be written on the subject. My goal is simply to have some confidence that futures and Tokio's polling work the way I expect.

First, some important disclaimers. Note that Tokio is actively being developed, so some of the observations here may quickly become out-of-date. For the purposes of this study, I used tokio-core 0.1.10, futures 0.1.17, and mio 0.6.10. Since I wanted to understand Tokio at its lowest levels, I did not consider higher-level crates like tokio-proto and tokio-service. The tokio-core event system itself has a lot of moving pieces, most of which I avoid discussing in the interest of brevity. I studied Tokio on a Linux system, and some of the discussion necessarily touches on platform-dependent implementation details such as epoll. Finally, everything mentioned here is my interpretation as a newcomer to Tokio, so there could be errors or misunderstandings.

Asynchronous I/O in a nutshell

Synchronous I/O programming involves performing I/O operations which block until completion. Reads will block until data arrives, and writes will block until the outgoing bytes can be delivered to the kernel. This fits nicely with conventional imperative programming, where a series of steps are executed one after the other. For example, consider an HTTP server that spawns a new thread for each connection. On this thread, it may read bytes until an entire request is received (blocking as needed until all bytes arrive), processes the request, and then write the response (blocking as needed until all bytes are written). This is a very straightforward approach. The downside is that a distinct thread is needed for each connection due to the blocking, each with its own stack. In many cases this is fine, and synchronous I/O is the correct approach. However, the thread overhead hinders scalability on servers trying to handle a very large number of connections (see: the C10k problem), and may also be excessive on low-resource systems handling a few connections.

If our HTTP server was written to use asynchronous I/O, on the other hand, it might perform all I/O processing on a single thread. All active connections and the listening socket would be configured as non-blocking, monitored for read/write readiness in an event loop, and execution would be dispatched to handlers as events occur. State and buffers would need to be maintained for each connection. If a handler is only able to read 100 bytes of a 200-byte request, it cannot wait for the remaining bytes to arrive, since doing so would prevent other connections from making progress. It must instead store the partial read in a buffer, keep the state set to "reading request", and return to the event loop. The next time the handler is called for this connection, it may read the remainder of the request and transition to a "writing response" state. Implementing such a system can become hairy very fast, with complex state machines and error-prone resource management.

The ideal asynchronous I/O framework would provide a means of writing such I/O processing steps one after the other, as if they were blocking, but behind the scenes generate an event loop and state machines. That's a tough goal in most languages, but Tokio brings us pretty close.

The Tokio stack

The Tokio stack consists of the following components:

  1. The system selector. Each operating system provides a facility for receiving I/O events, such as epoll (Linux), kqueue() (FreeBSD/Mac OS), or IOCP (Windows).
  2. Mio - Metal I/O. Mio is a Rust crate that provides a common API for low-level I/O by internally handling the specific details for each operating system. Mio deals with the specifics of each operating system's selector so you don't have to.
  3. Futures. Futures provide a powerful abstraction for representing things that have yet to happen. These representations can be combined in useful ways to create composite futures describing a complex sequence of events. This abstraction is general enough to be used for many things besides I/O, but in Tokio we develop our asynchronous I/O state machines as futures.
  4. Tokio The tokio-core crate provides the central event loop which integrates with Mio to respond to I/O events, and drives futures to completion.
  5. Your program. A program using the Tokio framework can construct asynchronous I/O systems as futures, and provide them to the Tokio event loop for execution.

Mio: Metal I/O

Mio provides a low-level I/O API allowing callers to receive events such as socket read/write readiness changes. The highlights are:

  1. Poll and Evented. Mio supplies the Evented trait to represent anything that can be a source of events. In your event loop, you register a number of Evented's with a mio::Poll object, then call mio::Poll::poll() to block until events have occurred on one or more Evented objects (or the specified timeout has elapsed).
  2. System selector. Mio provides cross-platform access to the system selector, so that Linux epoll, Windows IOCP, FreeBSD/Mac OS kqueue(), and potentially others can all be used with the same API. The overhead required to adapt the system selector to the Mio API varies. Because Mio provides a readiness-based API similar to Linux epoll, many parts of the API can be one-to-one mappings when using Mio on Linux. (For example, mio::Events essentially is an array of struct epoll_event.) In contrast, because Windows IOCP is completion-based instead of readiness-based, a bit more adaptation is required to bridge the two paradigms. Mio supplies its own versions of std::net structs such as TcpListener, TcpStream, and UdpSocket. These wrap the std::net versions, but default to non-blocking and provide Evented implementations which add the socket to the system selector.
  3. Non-system events. In addition to providing readiness of I/O sources, Mio can also indicate readiness events generated in user-space. For example, if a worker thread finishes a unit of work, it can signal completion to the event loop thread. Your program calls Registration::new2() to obtain a (Registration, SetReadiness) pair. The Registration object is an Evented which can be registered with Mio in your event loop, and set_readiness() can be called on the SetReadiness object whenever readiness needs to be indicated. On Linux, non-system event notifications are implemented using a pipe. When SetReadiness::set_readiness() is called, a 0x01 byte is written to the pipe. mio::Poll's underlying epoll is configured to monitor the reading end of the pipe, so epoll_wait() will unblock and Mio can deliver the event to the caller. Exactly one pipe is created when Poll is instantiated, regardless of how many (if any) non-system events are later registered.

Every Evented registration is associated with a caller-provided usize value typed as mio::Token, and this value is returned with events to indicate the corresponding registration. This maps nicely to the system selector in the Linux case, since the token can be placed in the 64-bit epoll_data union which functions in the same way.

To provide a concrete example of Mio operation, here's what happens internally when we use Mio to monitor a UDP socket on a Linux system:

  1. Create the socket.
    123456
    let socket = mio::net::UdpSocket::bind(
        &SocketAddr::new(
            std::net::IpAddr::V4(std::net::Ipv4Addr::new(127,0,0,1)),
            2000
        )
    ).unwrap();

    This creates a Linux UDP socket, wrapped in a std::net::UdpSocket, which itself is wrapped in a mio::net::UdpSocket. The socket is set to be non-blocking.

  2. Create the poll.
    1
    let poll = mio::Poll::new().unwrap();

    Mio initializes the system selector, readiness queue (for non-system events), and concurrency protection. The readiness queue initialization creates a pipe so readiness can be signaled from user-space, and the pipe's read file descriptor is added to the epoll. When a Poll object is created, it is assigned a unique selector_id from an incrementing counter.

  3. Register the socket with the poll.
    123456
    poll.register(
        &socket,
        mio::Token(0),
        mio::Ready::readable(),
        mio::PollOpt::level()
    ).unwrap();

    The UdpSocket's Evented.register() function is called, which proxies to a contained EventedFd which adds the socket's file descriptor to the poll selector (by ultimately using epoll_ctl(fepd, EPOLL_CTL_ADD, fd, &epoll_event) where epoll_event.data is set to the provided token value). When a UdpSocket is registered, its selector_id is set to the Poll's, thus associating it with the selector.

  4. Call poll() in an event loop.
    123456
    loop {
        poll.poll(&mut events, None).unwrap();
        for event in &events {
            handle_event(event);
        }
    }

    The system selector (epoll_wait()) and then the readiness queue are polled for new events. (The epoll_wait() blocks, but because non-system events trigger epoll via the pipe in addition to pushing to the readiness queue, they will still be processed in a timely manner.) The combined set of events are made available to the caller for processing.

Futures and Tasks

Futures are techniques borrowed from functional programming whereby computation that has yet to happen can be represented as a "future", and these individual futures can be combined to develop complex systems. This is useful for asynchronous I/O because the basic steps needed to perform transactions can be modeled as such combined futures. In the HTTP server example, one future may read a request by reading bytes as they become available until the end of the request is reached, at which time a "Request" object is yielded. Another future may process a request and yield a response, and yet another future may write responses.

In Rust, futures are implemented in the futures crate. You can define a future by implementing the Future trait, which requires a poll() method which is called as needed to allow the future to make progress. This method returns either an error, an indication that the future is still pending thus poll() should be called again later, or a yielded value if the future has reached completion. The Future trait also provides a great many combinators as default methods.

To understand futures, it is crucial to understand tasks, executors, and notifications — and how they arrange for a future's poll() method to be called at the right time. Every future is executed within a task context. A task itself is directly associated with exactly one future, but this future may be a composite future that drives many contained futures. (For example, multiple futures joined into a single future using the join_all() combinator, or two futures executed in series using the and_then() combinator.)

Tasks and their futures require an executor to run. An executor is responsible for polling the task/future at the correct times — usually when it has been notified that progress can be made. Such a notification happens when some other code calls the notify() method of the provided object implementing the futures::executor::Notify trait. An example of this can be seen in the extremely simple executor provided by the futures crate that is invoked when calling the wait() method on a future. From the source code:

233234235236237238239240241242243244245246247248249
/// Waits for the internal future to complete, blocking this thread's
/// execution until it does.
///
/// This function will call `poll_future` in a loop, waiting for the future
/// to complete. When a future cannot make progress it will use
/// `thread::park` to block the current thread.
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
    ThreadNotify::with_current(|notify| {

        loop {
            match self.poll_future_notify(notify, 0)? {
                Async::NotReady => notify.park(),
                Async::Ready(e) => return Ok(e),
            }
        }
    })
}

Given a futures::executor::Spawn object previously created to fuse a task and future, this executor calls poll_future_notify() in a loop. The provided Notify object becomes part of the task context and the future is polled. If a future's poll() returns Async::NotReady indicating that the future is still pending, it must arrange to be polled again in the future. It can obtain a handle to its task via futures::task::current() and call the notify() method whenever the future can again make progress. (Whenever a future is being polled, information about its associated task is stored in a thread-local which can be accessed via current().) In the above case, if the poll returns Async::NotReady, the executor will block until the notification is received. Perhaps the future starts some work on another thread which will call notify() upon completion, or perhaps the poll() itself calls notify() directly before returning Async::NotReady. (The latter is not common, since theoretically a poll() should continue making progress, if possible, before returning.)

The Tokio event loop acts as a much more sophisticated executor that integrates with Mio events to drive futures to completion. In this case, a Mio event indicating socket readiness will result in a notification that causes the corresponding future to be polled.

Tasks are the basic unit of execution when dealing with futures, and are essentially green threads providing a sort of cooperative multitasking, allowing multiple execution contexts on one operating system thread. When one task is unable to make progress, it will yield the processor to other runnable tasks. It is important to understand that notifications happen at the task level and not the future level. When a task is notified, it will poll its top-level future, which may result in any or all of the child futures (if present) being polled. For example, if a task's top-level future is a join_all() of ten other futures, and one of these futures arranges for the task to be notified, all ten futures will be polled whether they need it or not.

Tokio's interface with Mio

Tokio converts task notifications into Mio events by using Mio's "non-system events" feature described above. After obtaining a Mio (Registration, SetReadiness) pair for the task, it registers the Registration (which is an Evented) with Mio's poll, then wraps the SetReadiness object in a MySetReadiness which implements the Notify trait. From the source code:

791792793794795796797798
struct MySetReadiness(mio::SetReadiness);

impl Notify for MySetReadiness {
    fn notify(&self, _id: usize) {
        self.0.set_readiness(mio::Ready::readable())
              .expect("failed to set readiness");
    }
}

In this way, task notifications are converted into Mio events, and can be processed in Tokio's event handling and dispatch code along with other types of Mio events.

Just as Mio wraps std::net structs such as UdpSocket, TcpListener, and TcpStream to customize functionality, Tokio also uses composition and decoration to provide Tokio-aware versions of these types. For example, Tokio's UdpSocket looks something like this:

Tokio's versions of these I/O source types provide constructors that require a handle to the event loop (tokio_core::reactor::Handle). When instantiated, these types will register their sockets with the event loop's Mio poll to receive edge-triggered events with a newly assigned even-numbered token. (More on this, below.) Conveniently, these types will also arrange for the current task to be notified of read/write readiness whenever the underlying I/O operation returns WouldBlock.

Tokio registers several types of Evented's with Mio, keyed to specific tokens:

  • Token 0 (TOKEN_MESSAGES) is used for Tokio's internal message queue, which provides a means of removing I/O sources, scheduling tasks to receive read/write readiness notifications, configuring timeouts, and running arbitrary closures in the context of the event loop. This can be used to safely communicate with the event loop from other threads. For example, Remote::spawn() marshals the future to the event loop via the message system.

    The message queue is implemented as a futures::sync::mpsc stream. As a futures::stream::Stream (which is similar to a future, except it yields a sequence of values instead of a single value), the processing of this message queue is performed using the MySetReadiness scheme mentioned above, where the Registration is registered with the TOKEN_MESSAGES token. When TOKEN_MESSAGES events are received, they are dispatched to the consume_queue() method for processing. (Source: enum Message, consume_queue())

  • Token 1 (TOKEN_FUTURE) is used to notify Tokio that the main task needs to be polled. This happens when a notification occurs which is associated with the main task. (In other words, the future passed to Core::run() or a child thereof, not a future running in a different task via spawn().) This also uses a MySetReadiness scheme to translate future notifications into Mio events. Before a future running in the main task returns Async::NotReady, it will arrange for a notification to be sent later in a manner of its choosing. When the resulting TOKEN_FUTURE event is received, the Tokio event loop will re-poll the main task.

  • Even-numbered tokens greater than 1 (TOKEN_START+key*2) are used to indicate readiness changes on I/O sources. The key is the Slab key for the associated Core::inner::io_dispatch Slab<ScheduledIo> element. The Mio I/O source types (UdpSocket, TcpListener, and TcpStream) are registered with such a token automatically when the corresponding Tokio I/O source types are instantiated.

  • Odd-numbered tokens greater than 1 (TOKEN_START+key*2+1) are used to indicate that a spawned task (and thus its associated future) should be polled. The key is the Slab key for the associated Core::inner::task_dispatch Slab<ScheduledTask> element. As with TOKEN_MESSAGES and TOKEN_FUTURE events, these also use the MySetReadiness plumbing.

Tokio event loop

Tokio, specifically tokio_core::reactor::Core, provides the event loop to manage futures and tasks, drive futures to completion, and interface with Mio so that I/O events will result in the correct tasks being notified. Using the event loop involves instantiating the Core with Core::new() and calling Core::run() with a single future. The event loop will drive the provided future to completion before returning. For server applications, this future is likely to be long-lived. It may, for example, use a TcpListener to continuously accept new incoming connections, each of which may be handled by their own future running independently in a separate task created by Handle.spawn().

The following flow chart outlines the basic steps of the Tokio event loop:

What happens when data arrives on a socket?

A useful exercise for understanding Tokio is to examine the steps that occur within the event loop when data arrives on a socket. I was surprised to discover that this ends up being a two-part process, with each part requiring a separate epoll transaction in a separate iteration of the event loop. The first part responds to a socket becoming read-ready (i.e., a Mio event with an even-numbered token greater than one for spawned tasks, or TOKEN_FUTURE for the main task) by sending a notification to the task which is interested in the socket. The second part handles the notification (i.e., a Mio event with an odd-numbered token greater than one) by polling the task and its associated future. We'll consider the steps in a scenario where a spawned future is reading from a UdpSocket on a Linux system, from the top of the Tokio event loop, assuming that a previous poll of the future resulted in a recv_from() returning a WouldBlock error.

The Tokio event loop calls mio::Poll::poll(), which in turn (on Linux) calls epoll_wait(), which blocks until some readiness change event occurs on one of the monitored file descriptors. When this happens, epoll_wait() returns an array of epoll_event structs describing what has occurred, which are translated by Mio into mio::Events and returned to Tokio. (On Linux, this translation should be zero-cost, since mio::Events is just a single-tuple struct of a epoll_event array.) In our case, assume the only event in the array is indicating read readiness on the socket. Because the event token is even and greater than one, Tokio interprets this as an I/O event, and looks up the details in the corresponding element of Slab<ScheduledIo>, which contains information on any tasks interested in read and write readiness for this socket. Tokio then notifies the reader task which, by way of the MySetReadiness glue described earlier, calls Mio's set_readiness(). Mio handles this non-system event by adding the event details to its readiness queue, and writing a single 0x01 byte to the readiness pipe.

After the Tokio event loop moves to the next iteration, it once again polls Mio, which calls epoll_wait(), which this time returns a read readiness event occurring on Mio's readiness pipe. Mio reads the 0x01 which was previously written to the pipe, dequeues the non-system event details from the readiness queue, and returns the event to Tokio. Because the event token is odd and greater than one, Tokio interprets this as a task notification event, and looks up the details in the corresponding element of Slab<ScheduledTask>, which contains the task's original Spawn object returned from spawn(). Tokio polls the task and its future via poll_future_notify(). The future may then read data from the socket until it gets a WouldBlock error.

This two-iteration approach involving a pipe write and read may add a little overhead when compared to other asynchronous I/O event loops. In a single-threaded program, it is weird to look at the strace and see a thread use a pipe to communicate with itself:

 1 2 3 4 5 6 7 8 91011
pipe2([4, 5], O_NONBLOCK|O_CLOEXEC) = 0
...
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=14, u64=14}}], 1024, -1) = 1
write(5, "\1", 1) = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128) = 1
read(4, 0x7ffce1140f58, 128) = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(12, "hello\n", 1024, 0, {sa_family=AF_INET, sin_port=htons(43106), sin_addr=inet_addr("127.0.0.1")}, [16]) = 6
recvfrom(12, 0x7f576621c800, 1024, 0, 0x7ffce1140070, 0x7ffce114011c) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(3, [], 1024, 0) = 0
epoll_wait(3, 0x7f5765b24000, 1024, -1) = -1 EINTR (Interrupted system call)

Mio uses this pipe scheme to support the general case where set_readiness() may be called from other threads, and perhaps it also has some benefits in forcing the fair scheduling of events and maintaining a layer of indirection between futures and I/O.

Lessons learned: Combining futures vs. spawning futures

When I first started exploring Tokio, I wrote a small program to listen for incoming data on several different UDP sockets. I created ten instances of a socket-reading future, each of them listening on a different port number. I naively joined them all into a single future with join_all(), passed the combined future to Core::run(), and was surprised to discover that every future was being polled whenever a single packet arrived. Also somewhat surprising was that tokio_core::net::UdpSocket::recv_from() (and its underlying PollEvented) was smart enough to avoid actually calling the operating system's recvfrom() on sockets that had not been flagged as read-ready in a prior Mio poll. The strace, reflecting a debug println!() in my future's poll(), looked something like this:

 1 2 3 4 5 6 7 8 9101112131415161718192021
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=14, u64=14}}], 1024, -1) = 1
write(5, "\1", 1) = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128) = 1
read(4, 0x7ffc183129d8, 128) = -1 EAGAIN (Resource temporarily unavailable)
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
recvfrom(12, "hello\n", 1024, 0, {sa_family=AF_INET, sin_port=htons(43106), sin_addr=inet_addr("127.0.0.1")}, [16]) = 6
getsockname(12, {sa_family=AF_INET, sin_port=htons(2006), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
write(1, "recv 6 bytes from 127.0.0.1:43106 at 127.0.0.1:2006\n", 52) = 52
recvfrom(12, 0x7f2a11c1c400, 1024, 0, 0x7ffc18312ba0, 0x7ffc18312c4c) = -1 EAGAIN (Resource temporarily unavailable)
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
write(1, "UdpServer::poll()...\n", 21) = 21
epoll_wait(3, [], 1024, 0) = 0
epoll_wait(3, 0x7f2a11c36000, 1024, -1) = ...

Since the concrete internal workings of Tokio and futures were somewhat opaque to me, I suppose I hoped there was some magic routing happening behind the scenes that would only poll the required futures. Of course, armed with a better understanding of Tokio, it's obvious that my program was using futures like this:

This actually works fine, but is not optimal — especially if you have a lot of sockets. Because notifications happen at the task level, any notification arranged in any of the green boxes above will cause the main task to be notified. It will poll its FromAll future, which itself will poll each of its children. What I really need is a simple main future that uses Handle::spawn() to launch each of the other futures in their own tasks, resulting in an arrangement like this:

When any future arranges a notification, it will cause only the future's specific task to be notified, and only that future will be polled. (Recall that "arranging a notification" happens automatically when tokio_core::net::UdpSocket::recv_from() receives WouldBlock from its underlying mio::net::UdpSocket::recv_from() call.) Future combinators are powerful tools for describing protocol flow that would otherwise be implemented in hand-rolled state machines, but it's important to understand where your design may need to support separate tasks that can make progress independently and concurrently.

Final thoughts

Studying the source code of Tokio, Mio, and futures has really helped solidify my comprehension of Tokio, and validates my strategy of clarifying abstractions through the understanding of their concrete implementations. This approach could pose a danger of only learning narrow use cases for the abstractions, so we must consciously consider the concretes as only being examples that shed light on the general cases. Reading the Tokio tutorials after studying the source code, I find myself with a bit of a hindsight bias: Tokio makes sense, and should have been easy to understand to begin with!

I still have a few remaining questions that I'll have to research some other day:

  • Does Tokio deal with the starvation problem of edge triggering? I suppose it could be handled within the future by limiting the number of read/writes in a single poll(). When the limit is reached, the future could return early after explicitly notifying the current task instead of relying on the implicit "schedule-on-WouldBlock" behavior of the Tokio I/O source types, thus allowing other tasks and futures a chance to make progress.
  • Does Tokio support any way of running the event loop itself on multiple threads, instead of relying on finding opportunities to offload work to worker threads to maximize use of processor cores?

UPDATE 2017-12-19: There is a Reddit thread on r/rust discussing this post. Carl Lerche, author of Mio, has posted some informative comments here and here. In addition to addressing the above questions, he notes that FuturesUnordered provides a means of combining futures such that only the relevant child futures will be polled, thus avoiding polling every future as join_all() would, with the tradeoff of additional allocations. Also, a future version of Tokio will be migrating away from the mio::Registration scheme for notifying tasks, which could streamline some of the steps described earlier.

UPDATE 2017-12-21: It looks like Hacker News also had a discussion of this post.

UPDATE 2018-01-26: I created a GitHub repository for my Tokio example code.

Cursive: Writing terminal applications in Rust

As a learning exercise to sharpen my Rust programming skills, I recently toyed with writing a small program that uses a terminal-based user interface which I built using the Cursive crate developed by Alexandre Bury. Cursive provides a high-level framework for building event-driven terminal applications using visual components such as menu bars, text areas, lists, dialog boxes, etc. Conceptually, developing with Cursive is one level of abstraction higher than using a library such as ncurses, which provides a more raw interface to managing screen contents and translating updates to the terminal's native language. In fact, Cursive defaults to using ncurses as one of several possible backends, and allows setting themes to customize various text colors and styles.

Why write terminal applications?

In today's software world, no one writes terminal applications expecting them to be a hit with the masses. Graphical applications (e.g. desktop apps or web apps) provide a uniquely intuitive interface model that allows users to quickly become productive with a minimal learning curve, offer a high-bandwidth flow of information to the user, and remain the only reasonable solution for many problem categories. Many applications would simply not be possible or practical without a GUI. However, terminal programs can find a niche audience in technical users such as software developers and system administrators who are often in need of utilities that are a bit more two-dimensional than the command line's standard input and output, but retain the flexibility to be easily used remotely or on devices of limited capability.

Also, terminal apps are often extremely fast — fast enough to maintain the illusion of the computer being an extension of the mind. I find it frustrating that in 2017 I still spend plenty of time waiting for the computer to do something. Occasionally even typing into a text field in a web browser is laggy on my high-end late-model iMac. For every extra cycle the hardware engineers give us, we software engineers figure out some way to soak it up.

The terminal is not for everyone, but lately I've found it's the one environment that is instantaneous enough that my flow is not thrown off. For kicks, I recently installed XUbuntu on a $150 ARM Chromebook with the idea of mostly just using the terminal (and having a throwaway laptop that I'm not scared to use on the bus/train). I expected to mostly be using it as a dumb terminal to ssh into servers, but to my surprise, it has actually proven to be very capable at performing a wide range of local tasks in the terminal with good performance.

The Cursive framework

Anyone who has developed software with a GUI toolkit (e.g. Windows, GTK+, Java Swing, Cocoa, etc.) will find most Cursive concepts to be very familiar. Visual components are called "views" (some toolkits use use the terms "widget" or "control" for the same concept), and are installed into a tree which is traversed when rendering. Some views may contain child views and are used for layout (e.g. BoxView and LinearLayout), while others are used as leaf nodes that provide information or interact with the user (e.g. Button, EditView, TextView, SliderView, etc.). Cursive can maintain multiple view trees as "screens" which can be switched between. Each screen's view tree has a StackView as the root element, whose children are subtree "layers" that can be pushed and popped.

Cursive provides an event model where the main program invokes Cursive::run() and the Cursive event loop will render views and dispatch to registered callbacks (typically Rust closures) as needed until Cursive::quit() is called, at which time the event loop exits. Alternately, the main program may choose to exercise more control by calling Cursive::step() as needed to perform a single iteration of input processing, event dispatch, and view rendering. Key events are processed by whichever input view currently has focus, and the user may cycle focus using the tab key.

Referencing views

Cursive diverges from other UI toolkits with respect to referencing views. In many environments, we would simply store references or pointers to any views that we need to reference later, in addition to whatever references are needed internally by the view tree to form the parent-child relationships. However, Rust's strict ownership model requires us to be very explicit about how we allow multiple references to the same memory.

After the main program instantiates and configures a view object, it generally adds it to the view tree by making it the child of an existing view (e.g. LinearLayout::add_child()) or adding it to a screen's StackView as a layer. Rust ownership of the object is moved at that time, and it is no longer directly accessible to the main program.

To access specific views after they have been integrated into a view tree, views may be wrapped in an IdView via .with_id(&str) which allows them to be referenced later using the provided string identifier. A borrowed mutable reference to the wrapped view may be retrieved with Cursive::find_id() or a closure operating on the view may be invoked with Cursive::call_on_id(). Under the hood, these methods provide interior mutability by making use of RefCell and its runtime borrow checking to provide the caller with a borrowed mutable reference.

The following code demonstrates how views can be referenced by providing a callback which copies text from one view to the other:

 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132
extern crate cursive;

use cursive::Cursive;
use cursive::event::Key;
use cursive::view::*;
use cursive::views::*;

fn main() {
    let mut cursive = Cursive::new();

    // Create a view tree with a TextArea for input, and a
    // TextView for output.
    cursive.add_layer(LinearLayout::horizontal()
        .child(BoxView::new(SizeConstraint::Fixed(10),
                            SizeConstraint::Fixed(10),
                            Panel::new(TextArea::new()
                                .content("")
                                .with_id("input"))))
        .child(BoxView::new(SizeConstraint::Fixed(10),
                            SizeConstraint::Fixed(10),
                            Panel::new(TextView::new("")
                                .with_id("output")))));
    cursive.add_global_callback(Key::Esc, |c| {
        // When the user presses Escape, update the output view
        // with the contents of the input view.
        let input = c.find_id::<TextArea>("input").unwrap();
        let mut output = c.find_id::<TextView>("output").unwrap();
        output.set_content(input.get_content());
    });

    cursive.run();
}

Early in my exploration of Cursive, this method of accessing views proved to be somewhat challenging since fetching references to two views in the same lexical scope would result in BorrowMutError panics, since the internals of the second find_id() would try to mutably borrow a reference to the first view while traversing the tree. Cursive's view lookup code has since been adjusted so that this is no longer an issue.

Model-View-Controller

While developing a full application, I quickly ran into BorrowMutError panics again. With application logic tied to my custom view implementations, and some such code needing to call methods on other custom views, inevitably some code would need to mutably borrow a view that was already borrowed somewhere further up the stack.

My solution was to completely decouple UI concerns from the application logic, resulting in something along the lines of the well-known Model-View-Controller (MVC) design pattern. A Ui struct encapsulates all Cursive operations, and a Controller struct contains all application logic. Each struct contains a message queue which allows one to receive messages sent by the other. These messages are simple enums whose variants may contain associated data specific to the message type.

Instead of calling Cursive::run(), the controller will provide its own main loop where each iteration will operate as follows:

  1. The controller main loop will call Ui::step().
  2. The Ui::step() method will process any messages that the controller may have added to its message queue. These messages allow the controller to change the UI state in various ways.
  3. The Ui::step() method will then step the Cursive UI with Cursive::step(). Cursive will block until input is received. Any pending UI events will be processed and any registered callbacks will be executed. Callbacks may result in messages being posted to the controller's message queue (for example, the contents of a dialog box's form).
  4. The controller main loop will then process any messages that the UI may have added to its message queue. The controller may perform tasks related to these messages, and optionally post messages to the UI's message queue to indicate the outcome.

This scheme worked great for my needs where it's okay for the program to completely block while waiting for user input.

For the message queue, I used Rust's std::sync::mpsc (multi-producer, single consumer FIFO queue), which provides a convenient way for different code components to own a cloned Sender object which inserts elements into a shared queue. The use of mpsc is really overkill for the single-threaded applications I was working with, since any thread synchronization work being performed is wasted.

Here's an example of adapting the above text copy program to such an MVC model. It's admittedly much lengthier.

  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
extern crate cursive;

use cursive::Cursive;
use cursive::event::Key;
use cursive::view::*;
use cursive::views::*;
use std::sync::mpsc;

pub struct Ui {
    cursive: Cursive,
    ui_rx: mpsc::Receiver<UiMessage>,
    ui_tx: mpsc::Sender<UiMessage>,
    controller_tx: mpsc::Sender<ControllerMessage>,
}

pub enum UiMessage {
    UpdateOutput(String),
}

impl Ui {
    /// Create a new Ui object.  The provided `mpsc` sender will be used
    /// by the UI to send messages to the controller.
    pub fn new(controller_tx: mpsc::Sender<ControllerMessage>) -> Ui {
        let (ui_tx, ui_rx) = mpsc::channel::<UiMessage>();
        let mut ui = Ui {
            cursive: Cursive::new(),
            ui_tx: ui_tx,
            ui_rx: ui_rx,
            controller_tx: controller_tx,
        };

        // Create a view tree with a TextArea for input, and a
        // TextView for output.
        ui.cursive.add_layer(LinearLayout::horizontal()
            .child(BoxView::new(SizeConstraint::Fixed(10),
                                SizeConstraint::Fixed(10),
                                Panel::new(TextArea::new()
                                    .content("")
                                    .with_id("input"))))
            .child(BoxView::new(SizeConstraint::Fixed(10),
                                SizeConstraint::Fixed(10),
                                Panel::new(TextView::new("")
                                    .with_id("output")))));

        // Configure a callback
        let controller_tx_clone = ui.controller_tx.clone();
        ui.cursive.add_global_callback(Key::Esc, move |c| {
            // When the user presses Escape, send an
            // UpdatedInputAvailable message to the controller.
            let input = c.find_id::<TextArea>("input").unwrap();
            let text = input.get_content().to_owned();
            controller_tx_clone.send(
                ControllerMessage::UpdatedInputAvailable(text))
                .unwrap();
        });
        ui
    }

    /// Step the UI by calling into Cursive's step function, then
    /// processing any UI messages.
    pub fn step(&mut self) -> bool {
        if !self.cursive.is_running() {
            return false;
        }

        // Process any pending UI messages
        while let Some(message) = self.ui_rx.try_iter().next() {
            match message {
                UiMessage::UpdateOutput(text) => {
                    let mut output = self.cursive
                        .find_id::<TextView>("output")
                        .unwrap();
                    output.set_content(text);
                }
            }
        }

        // Step the UI
        self.cursive.step();

        true
    }
}

pub struct Controller {
    rx: mpsc::Receiver<ControllerMessage>,
    ui: Ui,
}

pub enum ControllerMessage {
    UpdatedInputAvailable(String),
}

impl Controller {
    /// Create a new controller
    pub fn new() -> Result<Controller, String> {
        let (tx, rx) = mpsc::channel::<ControllerMessage>();
        Ok(Controller {
            rx: rx,
            ui: Ui::new(tx.clone()),
        })
    }
    /// Run the controller
    pub fn run(&mut self) {
        while self.ui.step() {
            while let Some(message) = self.rx.try_iter().next() {
                // Handle messages arriving from the UI.
                match message {
                    ControllerMessage::UpdatedInputAvailable(text) => {
                        self.ui
                            .ui_tx
                            .send(UiMessage::UpdateOutput(text))
                            .unwrap();
                    }
                };
            }
        }
    }
}

fn main() {
    // Launch the controller and UI
    let controller = Controller::new();
    match controller {
        Ok(mut controller) => controller.run(),
        Err(e) => println!("Error: {}", e),
    };
}

Miscellaneous notes

  • Cursive is very much a work in progress and there are still some rough edges to be worked out. However, Alexandre Bury is lightning fast at responding to bug reports and fixing issues. One recent issue I filed went from report to patch to commit in 14 minutes.
  • It's unclear how you would develop a lightweight single-threaded program that uses reactor-style asynchronous I/O dispatch. For example, a central select() loop which dispatches stdin/stdout events to Cursive, network socket events to other code, and so on. (I'm not even sure if backends such as ncurses would even support this.)
  • I'm also not sure how I would go about structuring a multi-threaded application where the UI needs to process events from other threads. Cursive does provide a Cursive::set_fps() method which, in conjunction with Cursive::cb_sink(), can poll for new events at specified time intervals. But I've always preferred a purely event-driven design for such things instead of needlessly burning cycles periodically while waiting. (Again, there may be complications at the ncurses layer.)
  • Cursive wants callback closures to have static lifetime, which can lead to some Rust puzzles if you'd like to access non-static non-owned items from within closures. This may be inevitable, and the issue mostly goes away with the MVC decoupling technique mentioned above.

As a learning exercise, I wrote a Cursive-based interface to UPM password manager databases. However, nobody should use it for reasons outlined in its README.