In the long run it is not advisable to write large concurrent programs in machine-oriented languages that permit unrestricted use of store locations and their addresses. There is just no way we will be able to make such programs reliable (even with the help of complicated hardware mechanisms).
Per Brinch Hansen (1977)
Patterns for communication are patterns for parallelism.
Whit Morriss
If your attitude toward concurrency has changed over the course of your career, you’re not alone. It’s a common story.
At first, writing concurrent code is easy and fun. The tools—threads, locks, queues, and so on—are a snap to pick up and use. There are a lot of pitfalls, it’s true, but fortunately you know what they all are, and you are careful not to make mistakes.
At some point, you have to debug someone else’s multithreaded code, and you’re forced to conclude that some people really should not be using these tools.
Then at some point you have to debug your own multithreaded code.
Experience inculcates a healthy skepticism, if not outright cynicism, toward all multithreaded code. This is helped along by the occasional article explaining in mind-numbing detail why some obviously correct multithreading idiom does not work at all. (It has to do with “the memory model.”) But you eventually find one approach to concurrency that you think you can realistically use without constantly making mistakes. You can shoehorn pretty much everything into that idiom, and (if you’re really good) you learn to say “no” to added complexity.
Of course, there are rather a lot of idioms. Approaches that systems programmers commonly use include the following:
A background thread that has a single job and periodically wakes up to do it.
General-purpose worker pools that communicate with clients via task queues.
Pipelines where data flows from one thread to the next, with each thread doing a little of the work.
Data parallelism, where it is assumed (rightly or wrongly) that the whole computer will mainly just be doing one large computation, which is therefore split into n pieces and run on n threads in the hopes of putting all n of the machine’s cores to work at once.
A sea of synchronized objects, where multiple threads have access to the same data, and races are avoided using ad hoc locking schemes based on low-level primitives like mutexes. (Java includes built-in support for this model, which was quite popular during the 1990s and 2000s.)
Atomic integer operations allow multiple cores to communicate by passing information through fields the size of one machine word. (This is even harder to get right than all the others, unless the data being exchanged is literally just integer values. In practice, it’s usually pointers.)
In time, you may come to be able to use several of these approaches and combine them safely. You are a master of the art. And things would be great, if only nobody else were ever allowed to modify the system in any way. Programs that use threads well are full of unwritten rules.
Rust offers a better way to use concurrency, not by forcing all programs to adopt a single style (which for systems programmers would be no solution at all), but by supporting multiple styles safely. The unwritten rules are written down—in code—and enforced by the compiler.
You’ve heard that Rust lets you write safe, fast, concurrent programs. This is the chapter where we show you how it’s done. We’ll cover three ways to use Rust threads:
Fork-join parallelism
Channels
Shared mutable state
Along the way, you’re going to use everything you’ve learned so far about the Rust language. The care Rust takes with references, mutability, and lifetimes is valuable enough in single-threaded programs, but it is in concurrent programming that the true significance of those rules becomes apparent. They make it possible to expand your toolbox, to hack multiple styles of multithreaded code quickly and correctly—without skepticism, without cynicism, without fear.
The simplest use cases for threads arise when we have several completely independent tasks that we’d like to do at once.
For example, suppose we’re doing natural language processing on a large corpus of documents. We could write a loop:
fn
process_files
(
filenames
:Vec
<
String
>
)
->
io
::Result
<
()
>
{
for
document
in
filenames
{
let
text
=
load
(
&
document
)
?
;
// read source file
let
results
=
process
(
text
);
// compute statistics
save
(
&
document
,
results
)
?
;
// write output file
}
Ok
(())
}
The program would run as shown in Figure 19-1.
Since each document is processed separately, it’s relatively easy to speed this task up by splitting the corpus into chunks and processing each chunk on a separate thread, as shown in Figure 19-2.
This pattern is called fork-join parallelism. To fork is to start a new thread, and to join a thread is to wait for it to finish. We’ve already seen this technique: we used it to speed up the Mandelbrot program in Chapter 2.
Fork-join parallelism is attractive for a few reasons:
It’s dead simple. Fork-join is easy to implement, and Rust makes it easy to get right.
It avoids bottlenecks. There’s no locking of shared resources in fork-join. The only time any thread has to wait for another is at the end. In the meantime, each thread can run freely. This helps keep task-switching overhead low.
The performance math is straightforward. In the best case, by starting four threads, we can finish our work in a quarter of the time. Figure 19-2 shows one reason we shouldn’t expect this ideal speed-up: we might not be able to distribute the work evenly across all threads. Another reason for caution is that sometimes fork-join programs must spend some time after the threads join, combining the results computed by the threads. That is, isolating the tasks completely may make some extra work. Still, apart from those two things, any CPU-bound program with isolated units of work can expect a significant boost.
It’s easy to reason about program correctness. A fork-join program is deterministic as long as the threads are really isolated, like the compute threads in the Mandelbrot program. The program always produces the same result, regardless of variations in thread speed. It’s a concurrency model without race conditions.
The main disadvantage of fork-join is that it requires isolated units of work. Later in this chapter, we’ll consider some problems that don’t split up so cleanly.
For now, let’s stick with the natural language processing example. We’ll show a few ways of applying the fork-join pattern to the process_files
function.
The function std::thread::spawn
starts a new thread.
spawn
(
||
{
println
!
(
"hello from a child thread"
);
})
It takes one argument, a FnOnce
closure or function. Rust starts a new thread to run the code of that closure or function. The new thread is a real operating system thread with its own stack, just like threads in C++, C#, and Java.
Here’s a more substantial example, using spawn
to implement a parallel version of the process_files
function from before:
use
std
::thread
::spawn
;
fn
process_files_in_parallel
(
filenames
:Vec
<
String
>
)
->
io
::Result
<
()
>
{
// Divide the work into several chunks.
const
NTHREADS
:usize
=
8
;
let
worklists
=
split_vec_into_chunks
(
filenames
,
NTHREADS
);
// Fork: Spawn a thread to handle each chunk.
let
mut
thread_handles
=
vec
!
[];
for
worklist
in
worklists
{
thread_handles
.
push
(
spawn
(
move
||
process_files
(
worklist
))
);
}
// Join: Wait for all threads to finish.
for
handle
in
thread_handles
{
handle
.
join
().
unwrap
()
?
;
}
Ok
(())
}
Let’s take this function line by line.
fn
process_files_in_parallel
(
filenames
:Vec
<
String
>
)
->
io
::Result
<
()
>
{
Our new function has the same type signature as the original process_files
, making it a handy drop-in replacement.
// Divide the work into several chunks.
const
NTHREADS
:usize
=
8
;
let
worklists
=
split_vec_into_chunks
(
filenames
,
NTHREADS
);
We use a utility function split_vec_into_chunks
, not shown here, to divide up the work. The result, worklists
, is a vector of vectors. It contains eight evenly sized slices of the original vector filenames
.
// Fork: Spawn a thread to handle each chunk.
let
mut
thread_handles
=
vec
!
[];
for
worklist
in
worklists
{
thread_handles
.
push
(
spawn
(
move
||
process_files
(
worklist
))
);
}
We spawn a thread for each worklist
. spawn()
returns a value called a JoinHandle
, which we’ll use later. For now, we put all the JoinHandle
s into a vector.
Note how we get the list of filenames into the worker thread:
worklist
is defined and populated by the for
loop, in the parent thread.
As soon as the move
closure is created, worklist
is moved into the closure.
spawn
then moves the closure (including the worklist
vector) over to the new child thread.
These moves are cheap. Like the Vec<String>
moves we discussed in Chapter 4, the String
s are not cloned. In fact, nothing is allocated or freed. The only data moved is the Vec
itself: three machine words.
Most every thread you create needs both code and data to get started. Rust closures, conveniently, contain whatever code you want and whatever data you want.
Moving on:
// Join: Wait for all threads to finish.
for
handle
in
thread_handles
{
handle
.
join
().
unwrap
()
?
;
}
We use the .join()
method of the JoinHandle
s we collected earlier to wait for all eight threads to finish. Joining threads is often necessary for correctness, because a Rust program exits as soon as main
returns, even if other threads are still running. Destructors are not called; the extra threads are just killed. If this isn’t what you want, be sure to join any threads you care about before returning from main
.
If we manage to get through this loop, it means all eight child threads finished successfully. Our function therefore ends by returning Ok(())
:
Ok
(())
}
The code we used to join the child threads in our example is trickier than it looks, because of error handling. Let’s revisit that line of code:
handle
.
join
().
unwrap
()
?
;
The .join()
method does two neat things for us.
First, handle.join()
returns a std::thread::Result
that’s an error if the child thread panicked. This makes threading in Rust dramatically more robust than in C++. In C++, an out-of-bounds array access is undefined behavior, and there’s no protecting the rest of the system from the consequences. In Rust, panic is safe and per thread. The boundaries between threads serve as a firewall for panic; panic doesn’t automatically spread from one thread to the threads that depend on it. Instead, a panic in one thread is reported as an error Result
in other threads. The program as a whole can easily recover.
In our program, though, we don’t attempt any fancy panic handling. Instead, we immediately use .unwrap()
on this Result
, asserting that it is an Ok
result and not an Err
result. If a child thread did panic, then this assertion would fail, so the parent thread would panic too. We’re explicitly propagating panic from the child threads to the parent thread.
Second, handle.join()
passes the return value from the child thread back to the parent thread. The closure we passed to spawn
has a return type of io::Result<()>
. because that’s what process_files
returns. This return value isn’t discarded. When the child thread is finished, its return value is saved, and JoinHandle::join()
transfers that value back to the parent thread.
The full type returned by handle.join()
in this program is std::thread::Result<std::io::Result<()>>
. The thread::Result
is part of the spawn
/join
API; the io::Result
is part of our app.
In our case, after unwrapping the thread::Result
, we use the ?
operator on the io::Result
, explicitly propagating I/O errors from the child threads to the parent thread.
All of this may seem rather intricate. But consider that it’s just one line of code, and then compare this with other languages. The default behavior in Java and C# is for exceptions in child threads to be dumped to the terminal and then forgotten. In C++, the default is to abort the process. In Rust, errors are Result
values (data) instead of exceptions (control flow). They’re delivered across threads just like any other value. Any time you use low-level threading APIs, you end up having to write careful error-handling code, but given that you have to write it, Result
is very nice to have around.
Suppose the analysis we’re doing requires a large database of English words and phrases:
// before
fn
process_files
(
filenames
:Vec
<
String
>
)
// after
fn
process_files
(
filenames
:Vec
<
String
>
,
glossary
:&
GigabyteMap
)
This glossary
is going to be big, so we’re passing it in by reference. How can we update process_files_in_parallel
to pass the glossary through to the worker threads?
The obvious change does not work:
fn
process_files_in_parallel
(
filenames
:Vec
<
String
>
,
glossary
:&
GigabyteMap
)
->
io
::Result
<
()
>
{
...
for
worklist
in
worklists
{
thread_handles
.
push
(
spawn
(
move
||
process_files
(
worklist
,
glossary
))
// error
);
}
...
}
We’ve simply added a glossary
argument to our function and passed it along to process_files
. Rust complains:
error[E0477]: the type `[closure@...]` does not fulfill the required lifetime
--> concurrency_spawn_lifetimes.rs:35:13
|
35 | spawn(move || process_files(worklist, glossary)) // error
| ^^^^^
|
= note: type must satisfy the static lifetime
Rust is complaining about the lifetime of the closure we’re passing to spawn
.
spawn
launches independent threads. Rust has no way of knowing how long the child thread will run, so it assumes the worst: it assumes the child thread may keep running even after the parent thread has finished and all values in the parent thread are gone. Obviously, if the child thread is going to last that long, the closure it’s running needs to last that long too. But this closure has a bounded lifetime: it depends on the reference glossary
, and references don’t last forever.
Note that Rust is right to reject this code! The way we’ve written this function, it is possible for one thread to hit an I/O error, causing process_files_in_parallel
to bail out before the other threads are finished. Child threads could end up trying to use the glossary after the main thread has freed it. It would be a race—with undefined behavior as the prize, if the main thread should win. Rust can’t allow this.
It seems spawn
is too open-ended to support sharing references across threads. Indeed, we already saw a case like this, in “Closures That Steal”. There, our solution was to transfer ownership of the data to the new thread, using a move
closure. That won’t work here, since we have many threads that all need to use the same data. One safe alternative is to clone
the whole glossary for each thread, but since it’s large, we want to avoid that. Fortunately, the standard library provides another way: atomic reference counting.
We described Arc
in “Rc and Arc: Shared Ownership”. It’s time to put it to use:
use
std
::sync
::Arc
;
fn
process_files_in_parallel
(
filenames
:Vec
<
String
>
,
glossary
:Arc
<
GigabyteMap
>
)
->
io
::Result
<
()
>
{
...
for
worklist
in
worklists
{
// This call to .clone() only clones the Arc and bumps the
// reference count. It does not clone the GigabyteMap.
let
glossary_for_child
=
glossary
.
clone
();
thread_handles
.
push
(
spawn
(
move
||
process_files
(
worklist
,
&
glossary_for_child
))
);
}
...
}
We have changed the type of glossary
: to run the analysis in parallel, the caller must pass in an Arc<GigabyteMap>
, a smart pointer to a GigabyteMap
that’s been moved into the heap, by doing Arc::new(giga_map)
.
When we call glossary.clone()
, we are making a copy of the Arc
smart pointer, not the whole GigabyteMap
. This amounts to incrementing a reference count.
With this change, the program compiles and runs, because it no longer depends on reference lifetimes. As long as any thread owns an Arc<GigabyteMap>
, it will keep the map alive, even if the parent thread bails out early. There won’t be any data races, because data in an Arc
is immutable.
The standard library’s spawn
function is an important primitive, but it’s not designed specifically for fork-join parallelism. Better fork-join APIs have been built on top of it. For example, in Chapter 2 we used the Crossbeam library to split some work across eight threads. Crossbeam’s scoped threads support fork-join parallelism quite naturally.
The Rayon library, by Niko Matsakis, is another example. It provides two ways of running tasks concurrently:
extern
crate
rayon
;
use
rayon
::prelude
::*
;
// "do 2 things in parallel"
let
(
v1
,
v2
)
=
rayon
::join
(
fn1
,
fn2
);
// "do N things in parallel"
giant_vector
.
par_iter
().
for_each
(
|
value
|
{
do_thing_with_value
(
value
);
});
rayon::join(fn1, fn2)
simply calls both functions and returns both results. The .par_iter()
method creates a ParallelIterator
, a value with map
, filter
, and other methods, much like a Rust Iterator
. In both cases, Rayon uses its own pool of worker threads to spread out the work when possible. You simply tell Rayon what tasks can be done in parallel; Rayon manages threads and distributes the work as best it can.
The diagrams in Figure 19-3 illustrate two ways of thinking about the call giant_vector.par_iter().for_each(...)
. (a) Rayon acts as though it spawns one thread per element in the vector. (b) Behind the scenes, Rayon has one worker thread per CPU core, which is more efficient. This pool of worker threads is shared by all your program’s threads. When thousands of tasks come in at once, Rayon divides the work.
Here’s a version of process_files_in_parallel
using Rayon:
extern
crate
rayon
;
use
rayon
::prelude
::*
;
fn
process_files_in_parallel
(
filenames
:Vec
<
String
>
,
glossary
:&
GigabyteMap
)
->
io
::Result
<
()
>
{
filenames
.
par_iter
()
.
map
(
|
filename
|
process_file
(
filename
,
glossary
))
.
reduce_with
(
|
r1
,
r2
|
{
if
r1
.
is_err
()
{
r1
}
else
{
r2
}
})
.
unwrap_or
(
Ok
(()))
}
This code is shorter and less tricky than the version using std::thread::spawn
. Let’s look at it line by line:
First, we use filenames.par_iter()
to create a parallel iterator.
We use .map()
to call process_file
on each filename. This produces a ParallelIterator
over a sequence of io::Result<()>
values.
We use .reduce_with()
to combine the results. Here we’re keeping the first error, if any, and discarding the rest. If we wanted to accumulate all the errors, or print them, we could do that here.
The .reduce_with()
method is also handy when you pass a .map()
closure that returns a useful value on success. Then you can pass .reduce_with()
a closure that knows how to combine two success results.
reduce_with
returns an Option
that is None
only if filenames
was empty. We use the Option
’s .unwrap_or()
method to make the result Ok(())
in that case.
Behind the scenes, Rayon balances workloads across threads dynamically, using a technique called work-stealing. It will typically do a better job keeping all the CPUs busy than we can do by manually dividing the work in advance, as in “spawn and join”.
As a bonus, Rayon supports sharing references across threads. Any parallel processing that happens behind the scenes is guaranteed to be finished by the time reduce_with
returns. This explains why we were able to pass glossary
to process_file
even though that closure will be called on multiple threads.
(Incidentally, it’s no coincidence that we’ve used a map
method and a reduce
method. The MapReduce programming model, popularized by Google and by Apache Hadoop, has a lot in common with fork-join. It can be seen as a fork-join approach to querying distributed data.)
Back in Chapter 2, we used fork-join concurrency to render the Mandelbrot set. This made rendering four times as fast—impressive, but not as impressive as it could be, considering that we had the program spawn eight worker threads and ran it on an eight-core machine!
The problem is that we didn’t distribute the workload evenly. Computing one pixel of the image amounts to running a loop (see “What the Mandelbrot Set Actually Is”). It turns out that the pale gray parts of the image, where the loop quickly exits, are much faster to render than the black parts, where the loop runs the full 255 iterations. So although we split the area into equal-sized horizontal bands, we were creating unequal workloads, as Figure 19-4 shows.
This is easy to fix using Rayon. We can just fire off a parallel task for each row of pixels in the output. This creates several hundred tasks that Rayon can distribute across its threads. Thanks to work-stealing, it won’t matter that the tasks vary in size. Rayon will balance the work as it goes.
Here is the code. The first line and the last line are part of the main
function we showed back in “A Concurrent Mandelbrot Program”, but we’ve changed the rendering code, which is everything in between.
let
mut
pixels
=
vec
!
[
0
;
bounds
.
0
*
bounds
.
1
];
// Scope of slicing up `pixels` into horizontal bands.
{
let
bands
:Vec
<
(
usize
,
&
mut
[
u8
])
>
=
pixels
.
chunks_mut
(
bounds
.
0
)
.
enumerate
()
.
collect
();
bands
.
into_par_iter
()
.
weight_max
()
.
for_each
(
|
(
i
,
band
)
|
{
let
top
=
i
;
let
band_bounds
=
(
bounds
.
0
,
1
);
let
band_upper_left
=
pixel_to_point
(
bounds
,
(
0
,
top
),
upper_left
,
lower_right
);
let
band_lower_right
=
pixel_to_point
(
bounds
,
(
bounds
.
0
,
top
+
1
),
upper_left
,
lower_right
);
render
(
band
,
band_bounds
,
band_upper_left
,
band_lower_right
);
});
}
write_bitmap
(
&
args
[
1
],
&
pixels
,
bounds
).
expect
(
"error writing PNG file"
);
First, we create bands
, the collection of tasks that we will be passing to Rayon. Each task is just a tuple of type (usize, &mut [u8])
: the row number, since the computation requires that; and the slice of pixels
to fill in. We use the chunks_mut
method to break the image buffer into rows, enumerate
to attach a row number to each row, and collect
to slurp all the number-slice pairs into a vector. (We need a vector because Rayon creates parallel iterators only out of arrays and vectors.)
Next, we turn bands
into a parallel iterator, call .weight_max()
as a hint to Rayon that these tasks are very CPU-intensive, and then use the .for_each()
method to tell Rayon what work we want done.
Since we’re using Rayon, we must add these lines to main.rs:
extern
crate
rayon
;
use
rayon
::prelude
::*
;
and to Cargo.toml:
[dependencies] rayon = "0.4"
With these changes, the program now uses about 7.75 cores on an 8-core machine. It’s 75% faster than before, when we were dividing the work manually. And the code is a little shorter, reflecting the benefits of letting a crate do a job (work distribution) rather than doing it ourselves.
A channel is a one-way conduit for sending values from one thread to another. In other words, it’s a thread-safe queue.
Figure 19-5 illustrates how channels are used. They’re something like Unix pipes: one end is for sending data, and the other is for receiving. The two ends are typically owned by two different threads. But whereas Unix pipes are for sending bytes, channels are for sending Rust values. sender.send(item)
puts a single value into the channel; receiver.recv()
removes one. Ownership is transferred from the sending thread to the receiving thread. If the channel is empty, receiver.recv()
blocks until a value is sent.
With channels, threads can communicate by passing values to one another. It’s a very simple way for threads to work together without using locking or shared memory.
This is not a new technique. Erlang has had isolated processes and message passing for 30 years now. Unix pipes have been around for almost 50 years. We tend to think of pipes as providing flexibility and composability, not concurrency, but in fact, they do all of the above. An example of a Unix pipeline is shown in Figure 19-6. It is certainly possible for all three programs to be working at the same time.
Rust channels are faster than Unix pipes. Sending a value moves it rather than copying it, and moves are fast even when you’re moving data structures that contain many megabytes of data.
Over the next few sections, we’ll use channels to build a concurrent program that creates an inverted index, one of the key ingredients of a search engine. Every search engine works on a particular collection of documents. The inverted index is the database that tells which words appear where.
We’ll show the parts of the code that have to do with threads and channels. The complete program can be found at https://github.com/ProgrammingRust/fingertips. It’s short, about a thousand lines of code all told.
Our program is structured as a pipeline, as shown in Figure 19-7. Pipelines are only one of the many ways to use channels—we’ll discuss a few other uses later—but they’re a straightforward way to introduce concurrency into an existing single-threaded program.
We’ll use a total of five threads, each doing a distinct task. Each thread produces output continually over the lifetime of the program. The first thread, for example, simply reads the source documents from disk into memory, one by one. (We want a thread to do this because we’ll be writing the simplest possible code here, using File::open
and read_to_string
, which are blocking APIs. We don’t want the CPU to sit idle whenever the disk is working.) The output of this stage is one long String
per document, so this thread is connected to the next thread by a channel of String
s.
Our program will begin by spawning the thread that reads files. Suppose documents
is a Vec<PathBuf>
, a vector of filenames. The code to start our file-reading thread looks like this:
use
std
::fs
::File
;
use
std
::io
::prelude
::*
;
// for `Read::read_to_string`
use
std
::thread
::spawn
;
use
std
::sync
::mpsc
::channel
;
let
(
sender
,
receiver
)
=
channel
();
let
handle
=
spawn
(
move
||
{
for
filename
in
documents
{
let
mut
f
=
File
::open
(
filename
)
?
;
let
mut
text
=
String
::new
();
f
.
read_to_string
(
&
mut
text
)
?
;
if
sender
.
send
(
text
).
is_err
()
{
break
;
}
}
Ok
(())
});
Channels are part of the std::sync::mpsc
module. We’ll explain what this name means later; first, let’s look at how this code works. We start by creating a channel:
let
(
sender
,
receiver
)
=
channel
();
The channel
function returns a pair of values: a sender and a receiver. The underlying queue data structure is an implementation detail that the standard library does not expose.
Channels are typed. We’re going to use this channel to send the text of each file, so we have a sender
of type Sender<String>
and a receiver
of type Receiver<String>
. We could have explicitly asked for a channel of strings, by writing channel::<String>()
. Instead, we let Rust’s type inference figure it out.
let
handle
=
spawn
(
move
||
{
As before, we’re using std::thread::spawn
to start a thread. Ownership of sender
(but not receiver
) is transferred to the new thread via this move
closure.
The next few lines of code simply read files from disk:
for
filename
in
documents
{
let
mut
f
=
File
::open
(
filename
)
?
;
let
mut
text
=
String
::new
();
f
.
read_to_string
(
&
mut
text
)
?
;
After successfully reading a file, we send its text into the channel:
if
sender
.
send
(
text
).
is_err
()
{
break
;
}
}
sender.send(text)
moves the value text
into the channel. Ultimately, it will be moved again to whoever receives the value. Whether text
contains 10 lines of text or 10 megabytes, this operation copies three machine words (the size of a String
), and the corresponding receiver.recv()
call will also copy three machine words.
The send
and recv
methods both return Result
s, but these methods fail only if the other end of the channel has been dropped. A send
call fails if the Receiver
has been dropped, because otherwise the value would sit in the channel forever: without a Receiver
, there’s no way for any thread to receive it. Likewise, a recv
call fails if there are no values waiting in the channel and the Sender
has been dropped, because otherwise recv
would wait forever: without a Sender
, there’s no way for any thread to send the next value. Dropping your end of a channel is the normal way of “hanging up,” closing the connection when you’re done with it.
In our code, sender.send(text)
will fail only if the receiver’s thread has exited early. This is typical for code that uses channels. Whether that happened deliberately or due to an error, it’s OK for our reader thread to quietly shut itself down.
When that happens, or the thread finishes reading all the documents, it returns Ok(())
:
Ok
(())
});
Note that this closure returns a Result
. If the thread encounters an I/O error, it exits immediately, and the error is stored in the thread’s JoinHandle
.
Of course, just like any other programming language, Rust admits many other possibilities when it comes to error handling. When an error happens, we could just print it out using println!
and move on to the next file. We could pass errors along via the same channel that we’re using for data, making it a channel of Result
s—or create a second channel just for errors. The approach we’ve chosen here is both lightweight and responsible: we get to use the ?
operator, so there’s not a bunch of boilerplate code, or even an explicit try/catch
as you might see in Java; and yet errors won’t pass silently.
For convenience, our program wraps all of this code in a function that returns both the receiver
(which we haven’t used yet) and the new thread’s JoinHandle
:
fn
start_file_reader_thread
(
documents
:Vec
<
PathBuf
>
)
->
(
Receiver
<
String
>
,
JoinHandle
<
io
::Result
<
()
>>
)
{
let
(
sender
,
receiver
)
=
channel
();
let
handle
=
spawn
(
move
||
{
...
});
(
receiver
,
handle
)
}
Note that this function launches the new thread and immediately returns. We’ll write a function like this for each stage of our pipeline.
Now we have a thread running a loop that sends values. We can spawn a second thread running a loop that calls receiver.recv()
:
while
let
Ok
(
text
)
=
receiver
.
recv
()
{
do_something_with
(
text
);
}
But Receiver
s are iterable, so there’s a nicer way to write this:
for
text
in
receiver
{
do_something_with
(
text
);
}
These two loops are equivalent. Either way we write it, if the channel happens to be empty when control reaches the top of the loop, the receiving thread will block until some other thread sends a value. The loop will exit normally when the channel is empty and the Sender
has been dropped. In our program, that happens naturally when the reader thread exits. That thread is running a closure that owns the variable sender
; when the closure exits, sender
is dropped.
Now we can write code for the second stage of the pipeline:
fn
start_file_indexing_thread
(
texts
:Receiver
<
String
>
)
->
(
Receiver
<
InMemoryIndex
>
,
JoinHandle
<
()
>
)
{
let
(
sender
,
receiver
)
=
channel
();
let
handle
=
spawn
(
move
||
{
for
(
doc_id
,
text
)
in
texts
.
into_iter
().
enumerate
()
{
let
index
=
InMemoryIndex
::from_single_document
(
doc_id
,
text
);
if
sender
.
send
(
index
).
is_err
()
{
break
;
}
}
});
(
receiver
,
handle
)
}
This function spawns a thread that receives String
values from one channel (texts
) and sends InMemoryIndex
values to another channel (sender
/receiver
). This thread’s job is to take each of the files loaded in the first stage and turn each document into a little one-file, in-memory inverted index.
The main loop of this thread is straightforward. All the work of indexing a document is done by the function make_single_file_index
. We won’t show its source code here, but it’s a simple matter of splitting the input string along word boundaries, and then producing a map from words to lists of positions.
This stage doesn’t perform I/O, so it doesn’t have to deal with io::Error
s. Instead of an io::Result<()>
, it returns ()
.
The remaining three stages are similar in design. Each one consumes a Receiver
created by the previous stage. Our goal for the rest of the pipeline is to merge all the small indexes into a single large index file on disk. The fastest way we found to do this is in three stages. We won’t show the code here, just the type signatures of these three functions. The full source is online.
First, we merge indexes in memory until they get unwieldy (stage 3):
fn
start_in_memory_merge_thread
(
file_indexes
:Receiver
<
InMemoryIndex
>
)
->
(
Receiver
<
InMemoryIndex
>
,
JoinHandle
<
()
>
)
We write these large indexes to disk (stage 4):
fn
start_index_writer_thread
(
big_indexes
:Receiver
<
InMemoryIndex
>
,
output_dir
:&
Path
)
->
(
Receiver
<
PathBuf
>
,
JoinHandle
<
io
::Result
<
()
>>
)
Finally, if we have multiple large files, we merge them using a file-based merging algorithm (stage 5):
fn
merge_index_files
(
files
:Receiver
<
PathBuf
>
,
output_dir
:&
Path
)
->
io
::Result
<
()
>
This last stage does not return a Receiver
, because it’s the end of the line. It produces a single output file on disk. It doesn’t return a JoinHandle
, because we don’t bother spawning a thread for this stage. The work is done on the caller’s thread.
Now we come to the code that launches the threads and checks for errors:
fn
run_pipeline
(
documents
:Vec
<
PathBuf
>
,
output_dir
:PathBuf
)
->
io
::Result
<
()
>
{
// Launch all five stages of the pipeline.
let
(
texts
,
h1
)
=
start_file_reader_thread
(
documents
);
let
(
pints
,
h2
)
=
start_file_indexing_thread
(
texts
);
let
(
gallons
,
h3
)
=
start_in_memory_merge_thread
(
pints
);
let
(
files
,
h4
)
=
start_index_writer_thread
(
gallons
,
&
output_dir
);
let
result
=
merge_index_files
(
files
,
&
output_dir
);
// Wait for threads to finish, holding on to any errors that they encounter.
let
r1
=
h1
.
join
().
unwrap
();
h2
.
join
().
unwrap
();
h3
.
join
().
unwrap
();
let
r4
=
h4
.
join
().
unwrap
();
// Return the first error encountered, if any.
// (As it happens, h2 and h3 can't fail: those threads
// are pure in-memory data processing.)
r1
?
;
r4
?
;
result
}
As before, we use .join().unwrap()
to explicitly propagate panics from child threads to the main thread. The only other unusual thing here is that instead of using ?
right away, we set aside the io::Result
values until we’ve joined all four threads.
This pipeline is 40% faster than the single-threaded equivalent. That’s not bad for an afternoon’s work, but paltry-looking next to the 675% boost we got for the Mandelbrot program. We clearly haven’t saturated either the system’s I/O capacity or all the CPU cores. What’s going on?
Pipelines are like assembly lines in a manufacturing plant: performance is limited by the throughput of the slowest stage. A brand-new, untuned assembly line may be as slow as unit production, but assembly lines reward targeted tuning. In our case, measurement shows that the second stage is the bottleneck. Our indexing thread uses .to_lowercase()
and .is_alphanumeric()
, so it spends a lot of time poking around in Unicode tables. The other stages downstream from indexing spend most of their time asleep in Receiver::recv
, waiting for input.
This means we should be able to go faster. As we address the bottlenecks, the degree of parallelism will rise. Now that you know how to use channels and our program is made of isolated pieces of code, it’s easy to see ways to address this first bottleneck. We could hand-optimize the code for the second stage, just like any other code; break up the work into two or more stages; or run multiple file-indexing threads at once.
The mpsc
part of std::sync::mpsc
stands for multi-producer, single-consumer, a terse description of the kind of communication Rust’s channels provide.
The channels in our sample program carry values from a single sender to a single receiver. This is a fairly common case. But Rust channels also support multiple senders, in case you need, say, a single thread that handles requests from many client threads, as shown in Figure 19-8.
Sender<T>
implements the Clone
trait. To get a channel with multiple senders, simply create a regular channel and clone the sender as many times as you like. You can move each Sender
value to a different thread.
A Receiver<T>
can’t be cloned, so if you need to have multiple threads receiving values from the same channel, you need a Mutex
. We’ll show how to do it later in this chapter.
Rust channels are carefully optimized. When a channel is first created, Rust uses a special “one-shot” queue implementation. If you only ever send one object through the channel, the overhead is minimal. If you send a second value, Rust switches to a different queue implementation. It’s settling in for the long haul, really, preparing the channel to transfer many values while minimizing allocation overhead. And if you clone the Sender
, Rust must fall back on yet another implementation, one that is safe when multiple threads are trying to send values at once. But even the slowest of these three implementations is a lock-free queue, so sending or receiving a value is at most a few atomic operations and a heap allocation, plus the move itself. System calls are needed only when the queue is empty and the receiving thread therefore needs to put itself to sleep. In this case, of course, traffic through your channel is not maxed out anyway.
Despite all that optimization work, there is one mistake that’s very easy for applications to make around channel performance: sending values faster than they can be received and processed. This causes an ever-growing backlog of values to accumulate in the channel. For example, in our program, we found that the file reader thread (stage 1) could load files much faster than the file indexing thread (stage 2) could index them. The result is that hundreds of megabytes of raw data would be read from disk and stuffed in the queue at once.
This kind of misbehavior costs memory and hurts locality. Even worse, the sending thread keeps running, using up CPU and other system resources to send ever more values just when those resources are most needed on the receiving end.
Here Rust again takes a page from Unix pipes. Unix uses an elegant trick to provide some backpressure, so that fast senders are forced to slow down: each pipe on a Unix system has a fixed size, and if a process tries to write to a pipe that’s momentarily full, the system simply blocks that process until there’s room in the pipe. The Rust equivalent is called a synchronous channel.
use
std
::sync
::mpsc
::sync_channel
;
let
(
sender
,
receiver
)
=
sync_channel
(
1000
);
A synchronous channel is exactly like a regular channel except that when you create it, you specify how many values it can hold. For a synchronous channel, sender.send(value)
is potentially a blocking operation. After all, the idea is that blocking is not always bad. In our example program, changing the channel
in start_file_reader_thread
to a sync_channel
with room for 32 values cut memory usage by two-thirds on our benchmark data set, without decreasing throughput.
So far we’ve been acting as though all values can be freely moved and shared across threads. This is mostly true, but Rust’s full thread-safety story hinges on two built-in traits, std::marker::Send
and std::marker::Sync
.
Types that implement Send
are safe to pass by value to another thread. They can be moved across threads.
Types that implement Sync
are safe to pass by non-mut
reference to another thread. They can be shared across threads.
By safe here, we mean the same thing we always mean: free from data races and other undefined behavior.
For example, in the process_files_in_parallel
example , we used a closure to pass a Vec<String>
from the parent thread to each child thread. We didn’t point it out at the time, but this means the vector and its strings are allocated in the parent thread, but freed in the child thread. The fact that Vec<String>
implements Send
is an API promise that this is OK: the allocator used internally by Vec
and String
is thread-safe.
(If you were to write your own Vec
and String
types with fast but non-thread-safe allocators, you’d have to implement them using types that are not Send
, such as unsafe pointers. Rust would then infer that your NonThreadSafeVec
and NonThreadSafeString
types are not Send
and restrict them to single-threaded use. But that’s a rare case.)
As Figure 19-9 illustrates, most types are both Send
and Sync
. You don’t even have to use #[derive]
to get these traits on structs and enums in your program. Rust does it for you. A struct or enum is Send
if its fields are Send
, and Sync
if its fields are Sync
.
The few types that are not Send
and Sync
are mostly those that use mutability in a way that isn’t thread-safe. For example, consider std::rc::Rc<T>
, the type of reference-counting smart pointers.
What would happen if you could share an Rc<String>
across threads? If both threads happen to try to clone the Rc
at the same time, as shown in Figure 19-10, we have a data race as both threads increment the shared reference count. The reference count could become inaccurate, leading to a use-after-free or double free later—undefined behavior.
Of course, Rust prevents this. Here’s the code to set up this data race:
use
std
::thread
::spawn
;
use
std
::rc
::Rc
;
fn
main
()
{
let
rc1
=
Rc
::new
(
"hello threads"
.
to_string
());
let
rc2
=
rc1
.
clone
();
spawn
(
move
||
{
// error
rc2
.
clone
();
});
rc1
.
clone
();
}
Rust refuses to compile it, giving a detailed error message:
error[E0277]: the trait bound `Rc<String>: std::marker::Send` is not satisfied
in `[closure@...]`
--> concurrency_send_rc.rs:10:5
|
10 | spawn(move || { // error
| ^^^^^ within `[closure@...]`, the trait `std::marker::Send` is not
| implemented for `Rc<String>`
|
= note: `Rc<String>` cannot be sent between threads safely
= note: required because it appears within the type `[closure@...]`
= note: required by `std::thread::spawn`
Now you can see how Send
and Sync
help Rust enforce thread safety. They appear as bounds in the type signature of functions like spawn
that transfer data across thread boundaries. When you spawn
a thread, the closure you pass must be Send
, which means all the values it contains must be Send
. Similarly, if you try to want to send values through a channel to another thread, the values must be Send
.
Our inverted index builder is built as a pipeline. The code is clear enough, but it has us manually setting up channels and launching threads. By contrast, the iterator pipelines we built in Chapter 15 seemed to pack a lot more work into just a few lines of code. Can we build something like that for thread pipelines?
In fact, it would be nice if we could unify iterator pipelines and thread pipelines. Then our index builder could be written as an iterator pipeline. It might start like this:
documents
.
into_iter
()
.
map
(
read_whole_file
)
.
errors_to
(
error_sender
)
// filter out error results
.
off_thread
()
// spawn a thread for the above work
.
map
(
make_single_file_index
)
.
off_thread
()
// spawn another thread for stage 2
...
Traits allow us to add methods to standard library types, so we can actually do this. We start by writing a trait that declares the method we want:
use
std
::sync
::mpsc
;
pub
trait
OffThreadExt
:Iterator
{
/// Transform this iterator into an off-thread iterator: the
/// `next()` calls happen on a separate worker thread, so the
/// iterator and the body of your loop run concurrently.
fn
off_thread
(
self
)
->
mpsc
::IntoIter
<
Self
::Item
>
;
}
Then we implement this trait for iterator types. It helps that mpsc::Receiver
is already iterable.
use
std
::thread
::spawn
;
impl
<
T
>
OffThreadExt
for
T
where
T
:Iterator
+
Send
+
'static
,
T
::Item
:Send
+
'static
{
fn
off_thread
(
self
)
->
mpsc
::IntoIter
<
Self
::Item
>
{
// Create a channel to transfer items from the worker thread.
let
(
sender
,
receiver
)
=
mpsc
::sync_channel
(
1024
);
// Move this iterator to a new worker thread and run it there.
spawn
(
move
||
{
for
item
in
self
{
if
sender
.
send
(
item
).
is_err
()
{
break
;
}
}
});
// Return an iterator that pulls values from the channel.
receiver
.
into_iter
()
}
}
The where
clause in this code was determined via a process much like the one described in “Reverse-Engineering Bounds”. At first, we just had this:
impl
<
T
:Iterator
>
OffThreadExt
for
T
That is, we wanted the implementation to work for all iterators. Rust was having none of it. Because we’re using spawn
to move an iterator of type T
to a new thread, we must specify T: Iterator + Send + 'static
. Because we’re sending the items back over a channel, we must specify T::Item: Send + 'static
. With these changes, Rust was satisfied.
This is Rust’s character in a nutshell: we’re free to add a concurrency power tool to almost every iterator in the language—but not without first understanding and documenting the restrictions that make it safe to use.
In this section, we used pipelines as our examples because pipelines are a nice, obvious way to use channels. Everyone understands them. They’re concrete, practical, and deterministic. Channels are useful for more than just pipelines, though. They’re also a quick, easy way to offer any asynchronous service to other threads in the same process.
For example, suppose you’d like to do logging on its own thread, as in Figure 19-8. Other threads could send log messages to the logging thread over a channel; since you can clone the channel’s Sender
, many client threads can have senders that ship log messages to the same logging thread.
Running a service like logging on its own thread has advantages. The logging thread can rotate log files whenever it needs to. It doesn’t have to do any fancy coordination with the other threads. Those threads won’t be blocked. Messages will accumulate harmlessly in the channel for a moment until the logging thread gets back to work.
Channels can also be used for cases where one thread sends a request to another thread and needs to get some sort of response back. The first thread’s request can be a struct or tuple that includes a Sender
, a sort of self-addressed envelope that the second thread uses to send its reply. This doesn’t mean the interaction must be synchronous. The first thread gets to decide whether to block and wait for the response or use the .try_recv()
method to poll for it.
The tools we’ve presented so far—fork-join for highly parallel computation, channels for loosely connecting components—are sufficient for a wide range of applications. But we’re not done.
We’ve shown three techniques for using threads in Rust: fork-join parallelism, channels, and shared mutable state with locks. Our aim has been to provide a good introduction to the pieces Rust provides, with a focus on how they can fit together into real programs.
Rust insists on safety, so from the moment you decide to write a multithreaded program, the focus is on building safe, structured communication. Keeping threads mostly isolated is a good way to convince Rust that what you’re doing is safe. It happens that isolation is also a good way to make sure what you’re doing is correct and maintainable. Again, Rust guides you toward good programs.
More important, Rust lets you combine techniques and experiment. You can iterate fast: arguing with the compiler gets you up and running correctly a lot faster than debugging data races.