アペフチ

Rocketでチャネルをうまく扱いたい

細々とRustの勉強を続けています。なんか、一応解決したけど、これでいいのかなあ、っていう問題に遭遇したので、もっといい方法あるよっていう方はぜひ教えてください。

Rocketでウェブアプリケーションを動かしつつ、リクエスト処理が終わったら別スレッドのやつからウェブフックとか送りたい、と思ってチャネルを使ってみたのですが、はじめうまくいきませんでした。

// ...
fn main() {
    let (sender, receiver): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel();
    let web thread::spawn(move || {
        rocket::ignite()
            .manage(sender)
            .mount("/", routes![process_request])
            .launch();
    });
    let webhook = thread::spawn(move || {
        start_webhook(receiver);
    });
    let _ = web.join();
    let _ = webhook.join();
}

#[post("/", data = "<file>")]
fn process_request(notifier: State<mpsc::Sender<String>>, file: Data) -> Result<> {
    // ...
}
// ...

と、RocketのState機能を使ってみようとしたところ、

error[E0277]: `std::sync::mpsc::Sender<std::string::String>` cannot be shared between threads safely

と怒られる。 State はパラメーターに Send トレイトと Sync トレイトを要求して(Struct rocket::State)、 SenderSync を実装しない(Struct std::sync::mpsc::Sender)ので型から見るとそれは分かるんですけど、では何故そんな要求になってるの?

というのはRocketはマルチスレッドで動くから、スレッドをまたがって共有するために必要なことだったんでしょう。ということは分かるけど、公式ドキュメントの「並行性」の章では Senderclone() して各スレッドに渡すサンプルが載ってるので釈然としません。(まあ、今はまず動かして、後からどんどん直していこうというつもりで作ってるから先に進むけど。)

Launching a URL Shortener in Rust using Rocket」とか「Fearless Concurrency with Rust」とか読んで、最終的にMutexを使って解決することにしましたが、あれ、今考えたら Mutex<mpsc::Sender<String>> なんて作るくらいだったらSyncSenderを使えば同じなのでは? 帰ったらやってみよう。

追記

Mutex<mpsc::Sender<String>>mpsc::SyncSender<String> にしたらコンパイル通ったし問題なく動いた!