最近OSとかCPUの勉強にハマっています。
そこで、並行処理や並列処理が出てきてイマイチ腹落ちしなかったのでまとめてみました(非同期処理も)。
(一旦立ち止まって大枠を理解するのが大事なんだなとやっとわかってきたので…)
プログラムの実行を一時停止せずに他の作業を進行させる処理のこと。
実行結果を待たずにどんどん次の処理へと進んでいき、I/Oバウンドな処理に使うと力を発揮しやすいです。
I/O処理、特にネットワーク通信が絡んでくる処理はそれなりに遅く、早いAPIでも大体1msほどかかります(ベンチマークとったわけではなく完全な肌感)。一方でCPUの処理はめちゃくちゃ早くナノ秒単位で終わります。
つまり、1msは1nsの100万倍であり、I/Oバウンドな処理を同期的に処理するのはかなり非効率であることがわかります。同時に処理を走らせた方が効率的であり、そのためには非同期処理が必要です。
Rust の標準ライブラリには非同期ランタイムが含まれていないため、Tokioのようなクレイトが必要になってきます。
Tokioが非同期ネットワーク通信、ファイル I/O、タイマー管理などを効率的に処理してくれるのでかなり重宝していますね。
以下のコードではtokio::join!
を使って 2 つのタスクを非同期(かつ並行)に実行しています。
1use tokio::time::{sleep, Duration};
2use tokio;
3
4async fn async_task(id: u32) -> String {
5 println!("タスク{} 開始", id);
6 // ここにI/O処理入る想定だがsleepで代用
7 sleep(Duration::from_secs(1)).await;
8 println!("タスク{} 終了", id);
9 format!("タスク{}の結果", id)
10}
11
12#[tokio::main]
13async fn main() {
14 let (result1, result2) = tokio::join!(async_task(1), async_task(2));
15 println!("結果1: {}", result1);
16 println!("結果2: {}", result2);
17}
シングルプロセスでも並行処理できますが、話を単純化するためマルチプロセスで話を進めます。
並行処理は、シングルCPUコアが複数プロセス(例えばプロセスAとプロセスB)を超高速で入れ替えながらプロセスを実行することを言います。
ここで重要なこととして、複数プロセスが「超高速で交互に入れ替えられながら実行されている」という事実であり、実際には同時に実行していないということですね。
切り替えが超高速すぎてあたかも同時に動いてるように見えるだけです。
次のコードはマルチプロセス、シングルスレッドの例です
1use std::process::{Command, Stdio};
2use std::{thread, time};
3
4fn main() {
5 let mut process_a = Command::new("echo")
6 .arg("プロセスAが実行中")
7 .stdout(Stdio::piped())
8 .spawn()
9 .expect("プロセスAの開始に失敗しました");
10 let mut process_b = Command::new("echo")
11 .arg("プロセスBが実行中")
12 .stdout(Stdio::piped())
13 .spawn()
14 .expect("プロセスBの開始に失敗しました");
15 for i in 0..5 {
16 if i % 2 == 0 {
17 println!("プロセスAのID: {}", process_a.id());
18 println!("プロセスAを実行中...");
19 let _ = process_a.wait().expect("プロセスAの終了待機に失敗");
20 println!("プロセスAが終了");
21 } else {
22 println!("プロセスBのID: {}", process_b.id());
23 println!("プロセスBを実行中...");
24 let _ = process_b.wait().expect("プロセスBの終了待機に失敗");
25 println!("プロセスBが終了");
26 }
27 // 次のプロセスの実行まで少し待機
28 thread::sleep(time::Duration::from_millis(500));
29 }
30
31 println!("両方のプロセスが終了しました");
32}
1プロセスAのID: 64073
2プロセスAを実行中...
3プロセスAが終了
4プロセスBのID: 64083
5プロセスBを実行中...
6プロセスBが終了
7プロセスAのID: 64073
8プロセスAを実行中...
9プロセスAが終了
10プロセスBのID: 64083
11プロセスBを実行中...
12プロセスBが終了
13プロセスAのID: 64073
14プロセスAを実行中...
15プロセスAが終了
16両方のプロセスが終了しました
こんな感じで交互に実行されていることがわかるかと思います。
ちなみに、マルチプロセスはメモリ空間がプロセスごとに完全に独立しているのでコンテキストスイッチコストがかかりますが、競合状態やデータの不整合などの問題が発生しにくいです。
最後に並列処理です。
話を単純化するためマルチプロセス、マルチスレッドで話を進めます。
並列処理は複数CPUコアを使って本当に同時にプロセスを動かします。これは完全に同時に処理が動いています。
なので、処理速度が大幅に向上し全体的なパフォーマンスが改善されますが問題ももちろんありまして…
それは複数スレッドを使う場合に同時、共有リソースにアクセスする際、データの整合性を保つための同期制御が必要になってくることですね。
以下はマルチプロセス、マルチスレッドのコードです。
1use std::process::{Command, Stdio, Child};
2use std::sync::{Arc, Mutex};
3use std::thread;
4
5fn main() {
6 // マルチプロセス: 別々の子プロセスを生成
7 // Arc<Mutex<>>を使用して複数スレッドから安全にアクセス可能にする
8 let process_a = Arc::new(Mutex::new(
9 Command::new("echo")
10 .arg("プロセスA")
11 .stdout(Stdio::piped())
12 .spawn()
13 .expect("プロセスAの起動に失敗"),
14 ));
15
16 let process_b = Arc::new(Mutex::new(
17 Command::new("echo")
18 .arg("プロセスB")
19 .stdout(Stdio::piped())
20 .spawn()
21 .expect("プロセスBの起動に失敗"),
22 ));
23
24 // マルチスレッド: 2つの独立したスレッドを生成して並列実行
25 let handle_a = thread::spawn({
26 // スレッド間でプロセスを共有するためにArcをクローン
27 let process_a = Arc::clone(&process_a);
28 move || {
29 for i in 0..5 {
30 println!("スレッドA: プロセスA 実行回数: {}", i + 1);
31 // Mutexを使用してプロセスへの排他的アクセスを確保
32 let mut guard_a = process_a.lock().unwrap();
33 // 現在のプロセスを取り出し、新しいプロセスと置き換え
34 let process_a_child: Child = std::mem::replace(&mut *guard_a, Command::new("echo").spawn().unwrap());
35 // ロックを解放
36 drop(guard_a);
37 // 子プロセスの実行と結果の待機
38 process_a_child.wait_with_output().expect("プロセスAの実行に失敗");
39 // プロセスを再起動
40 let mut guard_a = process_a.lock().unwrap();
41 *guard_a = Command::new("echo").arg("プロセスA").stdout(Stdio::piped()).spawn().expect("プロセスAの再起動に失敗");
42 }
43 }
44 });
45
46 // スレッドBも同様の処理を実行
47 let handle_b = thread::spawn({
48 let process_b = Arc::clone(&process_b);
49 move || {
50 for i in 0..5 {
51 println!("スレッドB: プロセスB 実行回数: {}", i + 1);
52 let mut guard_b = process_b.lock().unwrap();
53 let process_b_child: Child = std::mem::replace(&mut *guard_b, Command::new("echo").spawn().unwrap());
54 drop(guard_b);
55 process_b_child.wait_with_output().expect("プロセスBの実行に失敗");
56 let mut guard_b = process_b.lock().unwrap();
57 *guard_b = Command::new("echo").arg("プロセスB").stdout(Stdio::piped()).spawn().expect("プロセスBの再起動に失敗");
58 }
59 }
60 });
61 // 両方のスレッドの完了を待機(並列実行の同期ポイント)
62 handle_a.join().expect("スレッドAの終了に失敗");
63 handle_b.join().expect("スレッドBの終了に失敗");
64
65 println!("両方のスレッドとプロセスが終了しました");
66}
ちなみに、以下のコードはスレッド間の共有リソースへの同時アクセスを防ぐためにロックしています。データの一貫性を守るために必要な処理です。
1let mut guard_a = process_a.lock().unwrap();
非同期処理、並行処理、並列処理と色々ありますがどれも銀の弾丸ではなく、ユースケースやボトルネックに対し使い分ける必要があります。
I/Oバウンドな処理であれば非同期処理が適していると思いますし、CPUバウンドな処理であれば並行処理や並列処理が適している場合が多いです。
また、マルチプロセスのコンテキストスイッチによるコストがパフォーマンスに影響を与えている場合はプロセスを減らしてスレッドを増やすなどの工夫が必要になってきます。
それぞれの手法特性を把握して適切に使い分けることで、より効率的でパフォーマンスの高いアプリケーションを構築できます。今後もこれらの技術を深く掘り下げ、実践に活かしていきたいと思いました。