4

Stopping a Rust Worker

 3 years ago
source link: https://matklad.github.io/2018/03/03/stopping-a-rust-worker.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Stopping a Rust Worker

Mar 3, 2018

This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but it’s rather difficult to come up with it by yourself.

Introducing a worker

To be able to stop a worker, we need to have one in the first place! So, let’s implement a model program.

The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ❤️). My solution looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        worker.send(Msg::Echo(line))
            .unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        loop {
            let msg = rx.recv().unwrap();
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
        }
    });
    tx
}

The program seems to work:

λ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
world
world ❤️
Bye!

Stopping the worker, the obvious way

Now that we have a worker, let’s add a new requirement.

When the user types stop, the worker (but not the program itself) should be halted.

How can we do this? The most obvious way is to add a new variant, Stop, to the Msg enum, and break out of the worker’s loop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        let msg = if line == "stop" {
            Msg::Stop
        } else {
            Msg::Echo(line)
        };

        worker.send(msg)
            .unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
    Stop,
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        loop {
            let msg = rx.recv().unwrap();
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
                Msg::Stop => break,
            }
        }
        println!("The worker has stopped!");
    });
    tx
}

This works, but only partially:

λ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

We can add more code to fix the panic, but let’s stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)

100 views edo 008

Dropping the microphone

The answer is: the cleanest way to cancel something in Rust is to drop it. For our task, we can stop the worker by dropping the Sender:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let mut worker = Some(spawn_worker());

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        if line == "stop" {
            drop(worker.take());
            continue
        };

        if let Some(ref worker) = worker {
            worker.send(Msg::Echo(line)).unwrap();
        } else {
            println!("The worker has been stopped!");
        };
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        while let Ok(msg) = rx.recv() {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
        }
        println!("The worker has stopped!");
    });
    tx
}

Note the interesting parts of the solution:

  • no need to invent an additional message type,

  • the Sender is stored inside an Option, so that we can drop it with the .take method,

  • the Option forces us to check if the worker is alive before sending a message.

More generally, previously the worker had two paths for termination: a normal termination via the Stop message and an abnormal termination after a panic in recv (which might happen if the parent thread panics and drops the Sender). Now there is a single code path for both cases. That means we can be surer that if something somewhere dies with a panic then the shutdown will proceed in an orderly fashion, it is not a special case anymore.

The only thing left to make this ultimately neat is to replace a hand-written while let with a for loop:

1
2
3
4
5
for msg in rx {
    match msg {
        Msg::Echo(msg) => println!("{} ❤️", msg),
    }
}

Am I awaited?

It’s interesting to see that the same pattern applies to the async version of the solution as well.

Async baseline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        worker = worker.send(Msg::Echo(line)).wait().unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
            Ok(())
        }).wait().unwrap()
    });
    tx
}

Async with a termination message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        let msg = if line == "stop" {
            Msg::Stop
        } else {
            Msg::Echo(line)
        };
        worker = worker.send(msg).wait().unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
    Stop,
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        let _ = rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => {
                    println!("{} ❤️", msg);
                    Ok(())
                },
                Msg::Stop => Err(()),
            }
        }).then(|result| {
            println!("The worker has stopped!");
            result
        }).wait();
    });
    tx
}
λ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

Async with drop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = Some(spawn_worker());

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        if line == "stop" {
            drop(worker.take());
            continue;
        };

        if let Some(w) = worker {
            worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
        } else {
            println!("The worker has been stopped!");
        }
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
            Ok(())
        }).map(|()| {
            println!("The worker has stopped!");
        }).wait().unwrap();
    });
    tx
}
λ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
The worker has been stopped!
Bye!

Conclusion

So, yeah, this all was written just to say "in Rust, cancellation is `drop`" :-)

Discussion on /r/rust.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK