通道

通道(Channel)是一種容易讓使用許多執行緒能寄送(send)資料到某個地方的方式。它們相當流行,因為它們能相當簡單得和其它東西放在一起用。你可以在 Rust 中用 std::sync::mpsc 建立通道。mpsc 的意思是"多個生產者,單個消費者"(Multiple Producer, Single Consumer),也就是"許多執行緒寄送一個地方"。要啟動通道,你可以使用 channel()。這會建立被束縛在一起的 SenderReceiver。你可以在函式簽名中看到這一點:

#![allow(unused)]
fn main() {
// 🚧
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
}

所以你要選擇一個名字給傳送者、另一個給接收者。通常你會看到像 let (sender, receiver) = channel(); 這樣的開頭。因為它是泛型的,如果你只寫這樣,Rust 會不知道型別:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel(); // ⚠️
}

編譯器說:

error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
  --> src\main.rs:30:30
   |
30 |     let (sender, receiver) = channel();
   |         ------------------   ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
   |         |
   |         consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified

它建議為 SenderReceiver 加上型別。如果你想可以這樣做:

use std::sync::mpsc::{channel, Sender, Receiver}; // 在這加上 Sender 和 Receiver

fn main() {
    let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel();
}

但你不必這樣做:一旦你開始使用 SenderReceiver,Rust 就能猜到型別。

所以讓我們來看一下使用通道最簡單的方式。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5);
    receiver.recv(); // recv = receive, 不是 "rec v"
}

現在編譯器知道型別了。sender 的是 Result<(), SendError<i32>>receiver 的是 Result<i32, RecvError>。所以你可以用 .unwrap() 來看看是否有寄送到,或者用更好的錯誤處理。讓我們加上 .unwrap() 還有 println!,看看得到什麼:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5).unwrap();
    println!("{}", receiver.recv().unwrap());
}

印出 5

channel 就像 Arc 一樣,因為你可以克隆它,並將克隆的內容寄送到其他執行緒中。讓我們做兩個執行緒,並將值寄送到 receiver。這段程式碼可以執行,但它並不是我們明確想要的那樣。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();

    std::thread::spawn(move|| { // 移入 sender
        sender.send("Send a &str this time").unwrap();
    });

    std::thread::spawn(move|| { // 移入 sender_clone
        sender_clone.send("And here is another &str").unwrap();
    });

    println!("{}", receiver.recv().unwrap());
}

讓兩個執行緒開始寄送,然後我們用 println!。它可能會印出 Send a &str this time 或者 And here is another &str,這取決於哪個執行緒先完成。讓我們做出會合控制碼(join handle)來讓它們等待。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![]; // 把我們的控制碼放在這

    handle_vec.push(std::thread::spawn(move|| {  // 把它推進向量裡
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {  // 還有把這個推進向量
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec { // 現在 handle_vec 裡有 2 個元素. 讓我們把它們印出來
        println!("{:?}", receiver.recv().unwrap());
    }
}

印出:

"Send a &str this time"
"And here is another &str"

現在讓我們做出 results_vec,而不是列印。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![];
    let mut results_vec = vec![];

    handle_vec.push(std::thread::spawn(move|| {
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec {
        results_vec.push(receiver.recv().unwrap());
    }

    println!("{:?}", results_vec);
}

現在結果在我們的向量中:["Send a &str this time", "And here is another &str"]

現在讓我們假設我們有很多工作要做,並且想要使用執行緒。我們有一百萬個元素的大向量,全部是 0,我們想把每個 0 都變成 1,我們將使用十個執行緒,每一個將負責十分之一的工作。我們還將建立新向量,並使用 .extend() 來收集結果。

use std::sync::mpsc::channel;
use std::thread::spawn;

fn main() {
    let (sender, receiver) = channel();
    let hugevec = vec![0; 1_000_000];
    let mut newvec = vec![];
    let mut handle_vec = vec![];

    for i in 0..10 {
        let sender_clone = sender.clone();
        let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // 新向量來收集結果. 1/10 的大小
        work.extend(&hugevec[i*100_000..(i+1)*100_000]); // 第一部份拿 0..100_000, 下一次拿 100_000..200_000, 以此類推.
        let handle =spawn(move || { // 做出控制碼

            for number in work.iter_mut() { // 做實際的工作
                *number += 1;
            };
            sender_clone.send(work).unwrap(); // 用 sender_clone 來寄送工作到 receiver
        });
        handle_vec.push(handle);
    }
    
    for handle in handle_vec { // 停止直到執行緒都完成工作
        handle.join().unwrap();
    }
    
    while let Ok(results) = receiver.try_recv() {
        newvec.push(results); // 從 receiver.recv() 推送結果進向量
    }

    // 現在我們有了 Vec<Vec<u8>>. 我們可以用 .flatten() 全部放在一起
    let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // 現在它是個有 1_000_000 個 u8 數字的向量
    
    println!("{:?}, {:?}, total length: {}", // 讓我們印出一些數字來確定它們全部都是 1
        &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // 以及證明大小是 1_000_000 個元素
    );
    
    for number in newvec { // 並且讓我們告訴 Rust 它可以恐慌, 如果有任何一個數字不是 1 的話
        if number != 1 {
            panic!();
        }
    }
}