Message Passing
Sharing memory across threads is complex, and simpler systems have been proposed. One of the most common is message passing, in which threads send arbitrary payloads of data back and forth to each other through special communication channels.
Rust provides a communication channel that can be written into by many worker threads and read from by one manager thread. It is in the mpsc
module, which is short for multiple-producer, single-consumer. This program creates a channel whose payloads are strings:
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel::<String>();
}
use std::sync::mpsc; fn main() { let (sender, receiver) = mpsc::channel::<String>(); }
The sender
end of the channel is given to the worker threads, which will produce strings and write them into the channel with a statement like this:
sender.send(message).unwrap();
sender.send(message).unwrap();
The send
method returns a Result
. The unwrap
call panics on an error. Meanwhile, the receiver
end is retained by the main thread, which reads the strings from the workers with a statement like this:
let message = receiver.recv().unwrap();
let message = receiver.recv().unwrap();
The receiver moonlights as an iterator, which means we may loop over it to read messages indefinitely.
Suppose we want to scan through three big files in parallel and find all tokens that meet some criteria—say they start with $
. We create three threads and give each a clone of sender
:
use std::thread;
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel::<String>();
for i in 0..3 {
let sender = sender.clone();
thread::spawn(move || {
// TODO: read file, send matching tokens
});
}
}
use std::thread; use std::sync::mpsc; fn main() { let (sender, receiver) = mpsc::channel::<String>(); for i in 0..3 { let sender = sender.clone(); thread::spawn(move || { // TODO: read file, send matching tokens }); } }
Inside each thread, we read in a big file. We'll assume the files are named big-file-0
, big-file-1
, and big-file-2
. Probably, if these are really big, we would not slurp them up with read_to_string
, as we do here. One way or another, we tokenize the file and send each token that meets the criteria:
use std::fs;
use std::thread;
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel::<String>();
for i in 0..3 {
let sender = sender.clone();
thread::spawn(move || {
let path = format!("big-file-{}", i);
let text = fs::read_to_string(path).expect("read failed");
let tokenizer = text.split_whitespace();
for token in tokenizer {
if token.starts_with('$') {
sender.send(token.to_string()).unwrap();
}
}
});
}
}
use std::fs; use std::thread; use std::sync::mpsc; fn main() { let (sender, receiver) = mpsc::channel::<String>(); for i in 0..3 { let sender = sender.clone(); thread::spawn(move || { let path = format!("big-file-{}", i); let text = fs::read_to_string(path).expect("read failed"); let tokenizer = text.split_whitespace(); for token in tokenizer { if token.starts_with('$') { sender.send(token.to_string()).unwrap(); } } }); } }
That's it for the worker threads. Back in the main thread, we set up a for loop that receives the matching tokens and prints them:
use std::fs;
use std::thread;
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel::<String>();
for i in 0..3 {
let sender = sender.clone();
thread::spawn(move || {
let path = format!("big-file-{}", i);
let text = fs::read_to_string(path).expect("read failed");
let tokenizer = text.split_whitespace();
for token in tokenizer {
if token.starts_with('$') {
sender.send(token.to_string()).unwrap();
}
}
});
}
drop(sender); // main still has a hook on sender
for message in receiver {
println!("{}", message);
}
}
use std::fs; use std::thread; use std::sync::mpsc; fn main() { let (sender, receiver) = mpsc::channel::<String>(); for i in 0..3 { let sender = sender.clone(); thread::spawn(move || { let path = format!("big-file-{}", i); let text = fs::read_to_string(path).expect("read failed"); let tokenizer = text.split_whitespace(); for token in tokenizer { if token.starts_with('$') { sender.send(token.to_string()).unwrap(); } } }); } drop(sender); // main still has a hook on sender for message in receiver { println!("{}", message); } }
The call to drop
releases main's hold on sender
. The threads each get a clone of sender
and release it when they finish. But the original sender
is still held by main. If we don't release it, the for-loop will block because there's a chance that more messages will be sent.
Channels are compelling because they simplify the effort needed to send results back to the manager thread. The manager doesn't need to prepare any shared memory beforehand and wrap it up in an Arc
and Mutex
. More advanced sharing networks that allow multiple consumers are available through crates.