In this interlude, we will see an implementation of a multithreading system on top of coroutines.
As we saw earlier, coroutines allow a kind of
collaborative multithreading.
Each coroutine is equivalent to a thread.
A pair yield–resume switches control from one thread to another.
However, unlike regular multithreading,
coroutines are non preemptive.
While a coroutine is running,
we cannot stop it from the outside.
It suspends execution only when it explicitly requests so,
through a call to yield
.
For several applications, this is not a problem,
quite the opposite.
Programming is much easier in the absence of preemption.
We do not need to be paranoid about synchronization bugs,
because all synchronization among threads
is explicit in the program.
We just need to ensure that
a coroutine yields only when it is outside a critical region.
However, with non-preemptive multithreading, whenever any thread calls a blocking operation, the whole program blocks until the operation completes. For many applications, this behavior is unacceptable, which leads many programmers to disregard coroutines as a real alternative to conventional multithreading. As we will see here, this problem has an interesting (and obvious, with hindsight) solution.
Let us assume a typical multithreading situation: we want to download several remote files through HTTP. To download several remote files, first we must learn how to download one remote file. In this example, we will use the LuaSocket library. To download a file, we must open a connection to its site, send a request to the file, receive the file (in blocks), and close the connection. In Lua, we can write this task as follows. First, we load the LuaSocket library:
local socket = require "socket"
Then, we define the host and the file we want to download. In this example, we will download the Lua 5.3 manual from the Lua site:
host = "www.lua.org" file = "/manual/5.3/manual.html"
Then, we open a TCP connection to port 80 (the standard port for HTTP connections) of that site:
c = assert(socket.connect(host, 80))
This operation returns a connection object, which we use to send the file request:
local request = string.format( "GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host) c:send(request)
Next, we read the file in blocks of 1 kB, writing each block to the standard output:
repeat local s, status, partial = c:receive(2^10) io.write(s or partial) until status == "closed"
The method receive
returns either a string with
what it read or nil in case of error;
in the latter case, it also returns an error code (status
)
and what it read until the error (partial
).
When the host closes the connection,
we print that remaining input and break the receive loop.
After downloading the file, we close the connection:
c:close()
Now that we know how to download one file,
let us return to the problem of downloading several files.
The trivial approach is to download one at a time.
However, this sequential approach,
where we start reading a file only after finishing the previous one,
is too slow.
When reading a remote file,
a program spends most of its time waiting for data to arrive.
More specifically, it spends most of its time blocked
in the call to receive
.
So, the program could run much faster if
it downloaded all files concurrently.
Then, while a connection has no data available,
the program can read from another connection.
Clearly, coroutines offer a convenient way to structure these
simultaneous downloads.
We create a new thread for each download task.
When a thread has no data available,
it yields control to a simple dispatcher,
which invokes another thread.
To rewrite the program with coroutines, we first rewrite the previous download code as a function. The result is in Figure 26.1, “Function to download a Web page”.
Figure 26.1. Function to download a Web page
function download (host, file) local c = assert(socket.connect(host, 80)) local count = 0 -- counts number of bytes read local request = string.format( "GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host) c:send(request) while true do local s, status = receive(c) count = count + #s if status == "closed" then break end end c:close() print(file, count) end
Because we are not interested in the remote file contents, this function counts and prints the file size, instead of writing the file to the standard output. (With several threads reading several files, the output would shuffle all files.)
In this new code, we use an auxiliary function (receive
)
to receive data from the connection.
In the sequential approach, its code would be like this:
function receive (connection) local s, status, partial = connection:receive(2^10) return s or partial, status end
For the concurrent implementation, this function must receive data without blocking. Instead, if there is not enough data available, it yields. The new code is like this:
function receive (connection) connection:settimeout(0) -- do not block local s, status, partial = connection:receive(2^10) if status == "timeout" then coroutine.yield(connection) end return s or partial, status end
The call to settimeout(0)
makes any operation over the
connection a non-blocking operation.
When the resulting status is "timeout"
,
it means that the operation returned without completion.
In this case, the thread yields.
The non-false argument passed to yield
signals to the dispatcher that
the thread is still performing its task.
Note that, even in case of a timeout,
the connection returns what it read until the timeout,
which is in the variable partial
.
Figure 26.2, “The dispatcher” shows the dispatcher plus some auxiliary code.
Figure 26.2. The dispatcher
tasks = {} -- list of all live tasks function get (host, file) -- create coroutine for a task local co = coroutine.wrap(function () download(host, file) end) -- insert it in the list table.insert(tasks, co) end function dispatch () local i = 1 while true do if tasks[i] == nil then -- no other tasks? if tasks[1] == nil then -- list is empty? break -- break the loop end i = 1 -- else restart the loop end local res = tasks[i]() -- run a task if not res then -- task finished? table.remove(tasks, i) else i = i + 1 -- go to next task end end end
The table tasks
keeps a list of all live tasks
for the dispatcher.
The function get
ensures that each download task
runs in an individual thread.
The dispatcher itself
is mainly a loop that goes through all tasks,
resuming them one by one.
It must also remove from the list
the tasks that have finished.
It stops the loop when there are no more tasks to run.
Finally, the main program creates the tasks it needs and calls the dispatcher. To download some distributions from the Lua site, the main program could be like this:
get("www.lua.org", "/ftp/lua-5.3.2.tar.gz") get("www.lua.org", "/ftp/lua-5.3.1.tar.gz") get("www.lua.org", "/ftp/lua-5.3.0.tar.gz") get("www.lua.org", "/ftp/lua-5.2.4.tar.gz") get("www.lua.org", "/ftp/lua-5.2.3.tar.gz") dispatch() -- main loop
The sequential implementation takes fifteen seconds to download these files, in my machine. This implementation with coroutines runs more than three times faster.
Despite the speedup, this last implementation is far from optimal. Everything goes fine while at least one thread has something to read. However, when no thread has data to read, the dispatcher does a busy wait, going from thread to thread only to check that they still have no data. As a result, this coroutine implementation uses three orders of magnitude more CPU than the sequential solution.
To avoid this behavior,
we can use the function select
from LuaSocket:
it allows a program to block while waiting
for a status change in a group of sockets.
The changes in our implementation are small:
we have to change only the dispatcher,
as shown in Figure 26.3, “Dispatcher using select
”.
Figure 26.3. Dispatcher using select
function dispatch () local i = 1 local timedout = {} while true do if tasks[i] == nil then -- no other tasks? if tasks[1] == nil then -- list is empty? break -- break the loop end i = 1 -- else restart the loop timedout = {} end local res = tasks[i]() -- run a task if not res then -- task finished? table.remove(tasks, i) else -- time out i = i + 1 timedout[#timedout + 1] = res if #timedout == #tasks then -- all tasks blocked? socket.select(timedout) -- wait end end end end
Along the loop,
this new dispatcher collects the timed-out connections
in the table timedout
.
(Remember that receive
passes such connections to yield
,
thus resume
returns them.)
If all connections time out,
the dispatcher calls select
to wait for any of these connections to change status.
This final implementation runs as fast as the previous implementation,
with coroutines.
Moreover, as it does not do busy waits,
it uses just as much CPU as the sequential implementation.
Personal copy of Eric Taylor <jdslkgjf.iapgjflksfg@yandex.com>