Stopping a Rust Worker
source link: https://matklad.github.io/2018/03/03/stopping-a-rust-worker.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Stopping a Rust Worker
This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but it’s rather difficult to come up with it by yourself.
Introducing a worker
To be able to stop a worker, we need to have one in the first place! So, let’s implement a model program.
The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ❤️). My solution looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker.send(Msg::Echo(line))
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
});
tx
}
The program seems to work:
λ cargo r Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/worker` hello hello ❤️ world world ❤️ Bye!
Stopping the worker, the obvious way
Now that we have a worker, let’s add a new requirement.
When the user types stop
, the worker (but not the program itself) should be halted.
How can we do this? The most obvious way is to add a new variant, Stop
, to the Msg
enum, and break out of the worker’s loop:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker.send(msg)
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
Msg::Stop => break,
}
}
println!("The worker has stopped!");
});
tx
}
This works, but only partially:
λ cargo r Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/worker` hello hello ❤️ stop The worker has stopped! world thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5 note: Run with `RUST_BACKTRACE=1` for a backtrace.
We can add more code to fix the panic, but let’s stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)
Dropping the microphone
The answer is: the cleanest way to cancel something in Rust is to drop it.
For our task, we can stop the worker by dropping the Sender
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue
};
if let Some(ref worker) = worker {
worker.send(Msg::Echo(line)).unwrap();
} else {
println!("The worker has been stopped!");
};
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
println!("The worker has stopped!");
});
tx
}
Note the interesting parts of the solution:
-
no need to invent an additional message type,
-
the
Sender
is stored inside anOption
, so that we can drop it with the.take
method, -
the
Option
forces us to check if the worker is alive before sending a message.
More generally, previously the worker had two paths for termination: a normal
termination via the Stop
message and an abnormal termination after a panic
in recv
(which might happen if the parent thread panics and drops the Sender
).
Now there is a single code path for both cases. That means we can be surer that if
something somewhere dies with a panic then the shutdown will proceed in an
orderly fashion, it is not a special case anymore.
The only thing left to make this ultimately neat is to replace a hand-written while let
with a for
loop:
1
2
3
4
5
for msg in rx {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
Am I awaited?
It’s interesting to see that the same pattern applies to the async version of the solution as well.
Async baseline:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker = worker.send(Msg::Echo(line)).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
Ok(())
}).wait().unwrap()
});
tx
}
Async with a termination message:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker = worker.send(msg).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
let _ = rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => {
println!("{} ❤️", msg);
Ok(())
},
Msg::Stop => Err(()),
}
}).then(|result| {
println!("The worker has stopped!");
result
}).wait();
});
tx
}
λ cargo r Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/worker` hello hello ❤️ stop The worker has stopped! world thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5 note: Run with `RUST_BACKTRACE=1` for a backtrace.
Async with drop:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue;
};
if let Some(w) = worker {
worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
} else {
println!("The worker has been stopped!");
}
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
Ok(())
}).map(|()| {
println!("The worker has stopped!");
}).wait().unwrap();
});
tx
}
λ cargo r Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/worker` hello hello ❤️ stop The worker has stopped! world The worker has been stopped! Bye!
Conclusion
So, yeah, this all was written just to say "in Rust, cancellation is `drop`" :-)
Discussion on /r/rust.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK