Message Passing

Dear Computer

Chapter 13: Concurrency and Parallelism

Message Passing

Sharing memory across threads is complex, and simpler systems have been proposed. One of the most common is message passing, in which threads send arbitrary payloads of data back and forth to each other through special communication channels.

Rust provides a communication channel that can be written into by many worker threads and read from by one manager thread. It is in the mpsc module, which is short for multiple-producer, single-consumer. This program creates a channel whose payloads are strings:

Rust
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();
}
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();
}

The sender end of the channel is given to the worker threads, which will produce strings and write them into the channel with a statement like this:

Rust
sender.send(message).unwrap();
sender.send(message).unwrap();

The send method returns a Result. The unwrap call panics on an error. Meanwhile, the receiver end is retained by the main thread, which reads the strings from the workers with a statement like this:

Rust
let message = receiver.recv().unwrap();
let message = receiver.recv().unwrap();

The receiver moonlights as an iterator, which means we may loop over it to read messages indefinitely.

Suppose we want to scan through three big files in parallel and find all tokens that meet some criteria—say they start with $. We create three threads and give each a clone of sender:

Rust
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      // TODO: read file, send matching tokens
    });
  }
}
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      // TODO: read file, send matching tokens
    });
  }
}

Inside each thread, we read in a big file. We'll assume the files are named big-file-0, big-file-1, and big-file-2. Probably, if these are really big, we would not slurp them up with read_to_string, as we do here. One way or another, we tokenize the file and send each token that meets the criteria:

Rust
use std::fs;
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      let path = format!("big-file-{}", i);
      let text = fs::read_to_string(path).expect("read failed");
      let tokenizer = text.split_whitespace();
      for token in tokenizer {
        if token.starts_with('$') {
          sender.send(token.to_string()).unwrap();
        }
      }
    });
  }
}
use std::fs;
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      let path = format!("big-file-{}", i);
      let text = fs::read_to_string(path).expect("read failed");
      let tokenizer = text.split_whitespace();
      for token in tokenizer {
        if token.starts_with('$') {
          sender.send(token.to_string()).unwrap();
        }
      }
    });
  }
}

That's it for the worker threads. Back in the main thread, we set up a for loop that receives the matching tokens and prints them:

Rust
use std::fs;
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      let path = format!("big-file-{}", i);
      let text = fs::read_to_string(path).expect("read failed");
      let tokenizer = text.split_whitespace();
      for token in tokenizer {
        if token.starts_with('$') {
          sender.send(token.to_string()).unwrap();
        }
      }
    });
  }

  drop(sender);  // main still has a hook on sender

  for message in receiver {
    println!("{}", message);
  }
}
use std::fs;
use std::thread;
use std::sync::mpsc;

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();

  for i in 0..3 {
    let sender = sender.clone();
    thread::spawn(move || {
      let path = format!("big-file-{}", i);
      let text = fs::read_to_string(path).expect("read failed");
      let tokenizer = text.split_whitespace();
      for token in tokenizer {
        if token.starts_with('$') {
          sender.send(token.to_string()).unwrap();
        }
      }
    });
  }

  drop(sender);  // main still has a hook on sender

  for message in receiver {
    println!("{}", message);
  }
}

The call to drop releases main's hold on sender. The threads each get a clone of sender and release it when they finish. But the original sender is still held by main. If we don't release it, the for-loop will block because there's a chance that more messages will be sent.

Channels are compelling because they simplify the effort needed to send results back to the manager thread. The manager doesn't need to prepare any shared memory beforehand and wrap it up in an Arc and Mutex. More advanced sharing networks that allow multiple consumers are available through crates.

← Shared MemoryData Parallelism →