Shared Memory
Worker threads need access to the shared memory that holds the data that it must process. They also need a place to store their results so that the main thread must collect. This memory may be accessed in several ways.
The shared data could be static, defined either in the global scope—which is our only choice in a C program—or as static fields inside a class or module.
In Ruby and Java, a flow given to a Thread
instance has closure semantics. That means it can read and write to variables from the surrounding scope. Alternatively, we can make a new subclass of Thread
and have it store references to the shared state in instance variables. However we store the data, when multiple threads can simultaneously access shared data, we must ensure that threads don't clobber or disrupt each other's work.
Closures in Rust have different semantics than in Ruby and Java. Free variables are borrowed by default. If those free variables are only in temporary stack storage, then the owners may be released before the borrowing thread finishes. The compiler therefore rejects threads that borrow local state. For example, consider this Rust program that attempts to share the iterator value thread_index
with the closures:
use std::thread;
fn main() {
let threads = (0..4).map(|thread_index| {
thread::spawn(|| {
println!("{}", thread_index); // compile error
})
});
for thread in threads {
thread.join().expect("join failed");
}
}
Variable thread_index
has a shorter lifetime than the thread, so the compiler fails at the print statement. One solution is to let the thread take ownership of the index by qualifying the closure with the move
modifier:
use std::thread;
fn main() {
let threads = (0..4).map(|thread_index| {
thread::spawn(move || {
println!("{}", thread_index);
})
});
for thread in threads {
thread.join().expect("join failed");
}
}
Variable thread_index
is no longer borrowed but is moved to the thread's closure.
A consequence of using move
is that data can no longer be shared amongst multiple threads because of Rust's policy of single ownership. To achieve sharing, we need help from an abstraction that permits multiple ownership. Earlier we learned about sharing ownership with Rc
. An Rc
instance is not safe to use with threads, but its relative Arc
is.
Suppose we have a function that returns a vector, like this load_numbers
function:
fn load_numbers() -> Vec<u32> {
vec![1, 2, 3, 4, 5, 6, 7, 8]
}
Imagine that the vector is longer than it is, and that we need to sum all of the vectors' elements as quickly as possible. We decide to break the vector into four logical chunks and have each chunk summed by a thread. This is how our code starts:
fn main() {
let xs = load_numbers();
let thread_count = 4;
let threads = (0..thread_count).map(|thread_index| {
// TODO: spawn thread
});
for thread in threads {
thread.join().expect("join failed");
}
}
All four threads need access to xs
, which is currently in temporary stack storage. That's a problem that move
fixes. However, the vector must be shared, so it cannot be moved. It must be wrapped up in an Arc
, which moves it to long-term heap storage and allows shared ownership. The Arc
is then cloned for each thread:
use std::sync::Arc;
fn main() {
let xs = Arc::new(load_numbers());
let thread_count = 4;
let threads = (0..thread_count).map(|thread_index| {
let xs = Arc::clone(&xs);
// TODO: spawn thread
});
for thread in threads {
thread.join().expect("join failed");
}
}
Each Arc
clone may be safely moved into its thread, which figures out its range of indices and sums up its elements:
use std::thread;
use std::sync::Arc;
fn main() {
let xs = Arc::new(load_numbers());
let thread_count = 4;
let threads = (0..thread_count).map(|thread_index| {
let xs = Arc::clone(&xs);
thread::spawn(move || {
let chunk_size = xs.len() / thread_count;
let start = chunk_size * thread_index;
let end = start + chunk_size;
let mut sum = 0;
for item_index in start..end {
sum += xs[item_index];
}
println!("{}", sum);
})
});
for thread in threads {
thread.join().expect("join failed");
}
}
An Arc
is automatically dereferenced. We can call vector methods like len
and []
on it directly. When a thread finishes, the reference count will decrement. When the final thread finishes, the count will reach 0, and the vector will be released from the heap.
The threads currently only print their individual sums, which is not helpful to either the user or other code. The four sums must be summed into a grand total. One way to do this is to declare a shared total
onto which each thread can add its sum. Because all threads need access to it, total
must be wrapped up in an Arc
. However, it must also be mutable.
Shared mutable state is trouble. Reads and writes may be interleaved or interrupted in ways that leave the data in a nonsensical state. Imagine driving to the store, and while you are busy shopping, your partner arrives at the store and drives off with the car. Because you both have keys that grant you full access to the car, you are capable of interfering with each other's work.
One way to avoid the troubles of shared mutable state is to to automatically lock the data when a thread gains access to it. If the data is already locked by another thread, then the requesting thread waits for the data to be unlocked. Rust's Mutex
provides such a lock.
This final version of the program allocates total
on the heap and wraps it up in both a Mutex
and an Arc
. Each thread computes its sum, acquires the lock, and accumulates its sum onto total
:
use std::thread;
use std::sync::{Arc,Mutex};
fn main() {
let xs = Arc::new(load_numbers());
let total = Arc::new(Mutex::new(0));
let thread_count = 4;
let threads = (0..thread_count).map(|thread_index| {
let xs = Arc::clone(&xs);
let total = Arc::clone(&total);
thread::spawn(move || {
let chunk_size = xs.len() / thread_count;
let start = chunk_size * thread_index;
let end = start + chunk_size;
let mut sum = 0;
for item_index in start..end {
sum += xs[item_index];
}
let mut total = total.lock().expect("lock failed");
*total += sum;
})
});
for thread in threads {
thread.join().expect("join failed");
}
let total = total.lock().expect("lock failed");
println!("{}", total);
}
The lock
method returns a value that must be dereferenced in order to be mutated. When the mutex goes out of scope at the end of the thread, it is automatically unlocked. When all threads finish, the main
function acquires the lock to get at the final result.