26 Interlude: Multithreading with Coroutines

In this interlude, we will see an implementation of a multithreading system on top of coroutines.

As we saw earlier, coroutines allow a kind of collaborative multithreading. Each coroutine is equivalent to a thread. A pair yield–resume switches control from one thread to another. However, unlike regular multithreading, coroutines are non preemptive. While a coroutine is running, we cannot stop it from the outside. It suspends execution only when it explicitly requests so, through a call to yield. For several applications, this is not a problem, quite the opposite. Programming is much easier in the absence of preemption. We do not need to be paranoid about synchronization bugs, because all synchronization among threads is explicit in the program. We just need to ensure that a coroutine yields only when it is outside a critical region.

However, with non-preemptive multithreading, whenever any thread calls a blocking operation, the whole program blocks until the operation completes. For many applications, this behavior is unacceptable, which leads many programmers to disregard coroutines as a real alternative to conventional multithreading. As we will see here, this problem has an interesting (and obvious, with hindsight) solution.

Let us assume a typical multithreading situation: we want to download several remote files through HTTP. To download several remote files, first we must learn how to download one remote file. In this example, we will use the LuaSocket library. To download a file, we must open a connection to its site, send a request to the file, receive the file (in blocks), and close the connection. In Lua, we can write this task as follows. First, we load the LuaSocket library:

      local socket = require "socket"

Then, we define the host and the file we want to download. In this example, we will download the Lua 5.3 manual from the Lua site:

      host = "www.lua.org"
      file = "/manual/5.3/manual.html"

Then, we open a TCP connection to port 80 (the standard port for HTTP connections) of that site:

      c = assert(socket.connect(host, 80))

This operation returns a connection object, which we use to send the file request:

      local request = string.format(
          "GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host)
      c:send(request)

Next, we read the file in blocks of 1 kB, writing each block to the standard output:

      repeat
        local s, status, partial = c:receive(2^10)
        io.write(s or partial)
      until status == "closed"

The method receive returns either a string with what it read or nil in case of error; in the latter case, it also returns an error code (status) and what it read until the error (partial). When the host closes the connection, we print that remaining input and break the receive loop.

After downloading the file, we close the connection:

      c:close()

Now that we know how to download one file, let us return to the problem of downloading several files. The trivial approach is to download one at a time. However, this sequential approach, where we start reading a file only after finishing the previous one, is too slow. When reading a remote file, a program spends most of its time waiting for data to arrive. More specifically, it spends most of its time blocked in the call to receive. So, the program could run much faster if it downloaded all files concurrently. Then, while a connection has no data available, the program can read from another connection. Clearly, coroutines offer a convenient way to structure these simultaneous downloads. We create a new thread for each download task. When a thread has no data available, it yields control to a simple dispatcher, which invokes another thread.

To rewrite the program with coroutines, we first rewrite the previous download code as a function. The result is in Figure 26.1, “Function to download a Web page”.

Because we are not interested in the remote file contents, this function counts and prints the file size, instead of writing the file to the standard output. (With several threads reading several files, the output would shuffle all files.)

In this new code, we use an auxiliary function (receive) to receive data from the connection. In the sequential approach, its code would be like this:

      function receive (connection)
        local s, status, partial = connection:receive(2^10)
        return s or partial, status
      end

For the concurrent implementation, this function must receive data without blocking. Instead, if there is not enough data available, it yields. The new code is like this:

      function receive (connection)
        connection:settimeout(0)      -- do not block
        local s, status, partial = connection:receive(2^10)
        if status == "timeout" then
          coroutine.yield(connection)
        end
        return s or partial, status
      end

The call to settimeout(0) makes any operation over the connection a non-blocking operation. When the resulting status is "timeout", it means that the operation returned without completion. In this case, the thread yields. The non-false argument passed to yield signals to the dispatcher that the thread is still performing its task. Note that, even in case of a timeout, the connection returns what it read until the timeout, which is in the variable partial.

Figure 26.2, “The dispatcher” shows the dispatcher plus some auxiliary code.

The table tasks keeps a list of all live tasks for the dispatcher. The function get ensures that each download task runs in an individual thread. The dispatcher itself is mainly a loop that goes through all tasks, resuming them one by one. It must also remove from the list the tasks that have finished. It stops the loop when there are no more tasks to run.

Finally, the main program creates the tasks it needs and calls the dispatcher. To download some distributions from the Lua site, the main program could be like this:

      get("www.lua.org", "/ftp/lua-5.3.2.tar.gz")
      get("www.lua.org", "/ftp/lua-5.3.1.tar.gz")
      get("www.lua.org", "/ftp/lua-5.3.0.tar.gz")
      get("www.lua.org", "/ftp/lua-5.2.4.tar.gz")
      get("www.lua.org", "/ftp/lua-5.2.3.tar.gz")
      
      dispatch()   -- main loop

The sequential implementation takes fifteen seconds to download these files, in my machine. This implementation with coroutines runs more than three times faster.

Despite the speedup, this last implementation is far from optimal. Everything goes fine while at least one thread has something to read. However, when no thread has data to read, the dispatcher does a busy wait, going from thread to thread only to check that they still have no data. As a result, this coroutine implementation uses three orders of magnitude more CPU than the sequential solution.

To avoid this behavior, we can use the function select from LuaSocket: it allows a program to block while waiting for a status change in a group of sockets. The changes in our implementation are small: we have to change only the dispatcher, as shown in Figure 26.3, “Dispatcher using select.

Along the loop, this new dispatcher collects the timed-out connections in the table timedout. (Remember that receive passes such connections to yield, thus resume returns them.) If all connections time out, the dispatcher calls select to wait for any of these connections to change status. This final implementation runs as fast as the previous implementation, with coroutines. Moreover, as it does not do busy waits, it uses just as much CPU as the sequential implementation.

Exercise 26.1: Implement and run the code presented in this chapter.

Personal copy of Eric Taylor <jdslkgjf.iapgjflksfg@yandex.com>