通道
通道(Channel)是一種容易讓使用許多執行緒能寄送(send)資料到某個地方的方式。它們相當流行,因為它們能相當簡單得和其它東西放在一起用。你可以在 Rust 中用 std::sync::mpsc
建立通道。mpsc
的意思是"多個生產者,單個消費者"(Multiple Producer, Single Consumer),也就是"許多執行緒寄送一個地方"。要啟動通道,你可以使用 channel()
。這會建立被束縛在一起的 Sender
和 Receiver
。你可以在函式簽名中看到這一點:
#![allow(unused)] fn main() { // 🚧 pub fn channel<T>() -> (Sender<T>, Receiver<T>) }
所以你要選擇一個名字給傳送者、另一個給接收者。通常你會看到像 let (sender, receiver) = channel();
這樣的開頭。因為它是泛型的,如果你只寫這樣,Rust 會不知道型別:
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); // ⚠️ }
編譯器說:
error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
--> src\main.rs:30:30
|
30 | let (sender, receiver) = channel();
| ------------------ ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
| |
| consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified
它建議為 Sender
和 Receiver
加上型別。如果你想可以這樣做:
use std::sync::mpsc::{channel, Sender, Receiver}; // 在這加上 Sender 和 Receiver fn main() { let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel(); }
但你不必這樣做:一旦你開始使用 Sender
和 Receiver
,Rust 就能猜到型別。
所以讓我們來看一下使用通道最簡單的方式。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); sender.send(5); receiver.recv(); // recv = receive, 不是 "rec v" }
現在編譯器知道型別了。sender
的是 Result<(), SendError<i32>>
,receiver
的是 Result<i32, RecvError>
。所以你可以用 .unwrap()
來看看是否有寄送到,或者用更好的錯誤處理。讓我們加上 .unwrap()
還有 println!
,看看得到什麼:
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); sender.send(5).unwrap(); println!("{}", receiver.recv().unwrap()); }
印出 5
。
channel
就像 Arc
一樣,因為你可以克隆它,並將克隆的內容寄送到其他執行緒中。讓我們做兩個執行緒,並將值寄送到 receiver
。這段程式碼可以執行,但它並不是我們明確想要的那樣。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); std::thread::spawn(move|| { // 移入 sender sender.send("Send a &str this time").unwrap(); }); std::thread::spawn(move|| { // 移入 sender_clone sender_clone.send("And here is another &str").unwrap(); }); println!("{}", receiver.recv().unwrap()); }
讓兩個執行緒開始寄送,然後我們用 println!
。它可能會印出 Send a &str this time
或者 And here is another &str
,這取決於哪個執行緒先完成。讓我們做出會合控制碼(join handle)來讓它們等待。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); let mut handle_vec = vec![]; // 把我們的控制碼放在這 handle_vec.push(std::thread::spawn(move|| { // 把它推進向量裡 sender.send("Send a &str this time").unwrap(); })); handle_vec.push(std::thread::spawn(move|| { // 還有把這個推進向量 sender_clone.send("And here is another &str").unwrap(); })); for _ in handle_vec { // 現在 handle_vec 裡有 2 個元素. 讓我們把它們印出來 println!("{:?}", receiver.recv().unwrap()); } }
印出:
"Send a &str this time"
"And here is another &str"
現在讓我們做出 results_vec
,而不是列印。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); let mut handle_vec = vec![]; let mut results_vec = vec![]; handle_vec.push(std::thread::spawn(move|| { sender.send("Send a &str this time").unwrap(); })); handle_vec.push(std::thread::spawn(move|| { sender_clone.send("And here is another &str").unwrap(); })); for _ in handle_vec { results_vec.push(receiver.recv().unwrap()); } println!("{:?}", results_vec); }
現在結果在我們的向量中:["Send a &str this time", "And here is another &str"]
。
現在讓我們假設我們有很多工作要做,並且想要使用執行緒。我們有一百萬個元素的大向量,全部是 0,我們想把每個 0 都變成 1,我們將使用十個執行緒,每一個將負責十分之一的工作。我們還將建立新向量,並使用 .extend()
來收集結果。
use std::sync::mpsc::channel; use std::thread::spawn; fn main() { let (sender, receiver) = channel(); let hugevec = vec![0; 1_000_000]; let mut newvec = vec![]; let mut handle_vec = vec![]; for i in 0..10 { let sender_clone = sender.clone(); let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // 新向量來收集結果. 1/10 的大小 work.extend(&hugevec[i*100_000..(i+1)*100_000]); // 第一部份拿 0..100_000, 下一次拿 100_000..200_000, 以此類推. let handle =spawn(move || { // 做出控制碼 for number in work.iter_mut() { // 做實際的工作 *number += 1; }; sender_clone.send(work).unwrap(); // 用 sender_clone 來寄送工作到 receiver }); handle_vec.push(handle); } for handle in handle_vec { // 停止直到執行緒都完成工作 handle.join().unwrap(); } while let Ok(results) = receiver.try_recv() { newvec.push(results); // 從 receiver.recv() 推送結果進向量 } // 現在我們有了 Vec<Vec<u8>>. 我們可以用 .flatten() 全部放在一起 let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // 現在它是個有 1_000_000 個 u8 數字的向量 println!("{:?}, {:?}, total length: {}", // 讓我們印出一些數字來確定它們全部都是 1 &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // 以及證明大小是 1_000_000 個元素 ); for number in newvec { // 並且讓我們告訴 Rust 它可以恐慌, 如果有任何一個數字不是 1 的話 if number != 1 { panic!(); } } }