Многопоточность - это сложно. C ++ тоже сложен. Я продемонстрирую, как мы можем хотя бы немного упростить многопоточность, избегая некоторых ловушек C ++, используя вместо этого Rust.

Некоторые из перечисленных здесь примеров C ++ являются модифицированными версиями примеров из Топ-20 ошибок многопоточности C ++ и способы их избежать Деба Хайдара, другие придуманы мной.

Имейте в виду, что все перечисленные здесь примеры являются игрушечными: в реальных кодовых базах многопоточность обычно намного сложнее, а предотвращение этих ошибок в C ++ может быть намного сложнее. В Rust компилятор по-прежнему будет проверять, что вы делаете, даже в этих сложных ситуациях, не позволяя вам сделать эти ошибки.

1. Условия гонки

#include <thread>
#include <iostream>
#include <string>
#include <thread>
int main() {
    auto data = std::string{"Hello, world!"};
    auto thread1 = std::thread([&] {
        data = std::string{"AAAAAAAAAAAAAAAAAAAAAAAA!"};
    });
    auto thread2 = std::thread([&] {
        for (auto&& c : data) {
            c += 1;
        }
    });
    thread1.join();
    thread2.join();
    std::cout << data << '\n';
}

В C ++ ничто не мешает нам ввести условия гонки, а в худшем случае мы даже обращаемся к недействительной памяти. Если thread1 обращается к data во время thread2 цикла, поток 2 может легко получить доступ к памяти, которая была освобождена. В больших базах кода может быть трудно определить, какие методы классов вызываются из нескольких потоков, даже если мы с осторожностью относимся к использованию std::atomic или std::mutex в противном случае.

fn main() {
    let mut data = "Hello, world!".to_owned();
    // error[E0373]: closure may outlive the current function,
    // but it borrows `data`, which is owned by the current function
    std::thread::spawn(||
        data = "AAAAAAAAAAAAAAAAAAAAAAAA!".to_owned()
    );
    // error[E0502]: cannot borrow `data` as immutable
    // because it is also borrowed as mutable
    // error[E0373]: closure may outlive the current function,
    // but it borrows `data`, which is owned by the current function
    std::thread::spawn(|| {
        for x in data.chars() {
            println!("{}", x);
        }
    });
}

Rust не позволит нам этого сделать. В Rust любая переменная может иметь неограниченное количество неизменяемых ссылок или одну изменяемую ссылку. Это означает, что у нас не может быть такого рода ошибок состояния гонки в безопасном коде Rust. У нас есть несколько способов сделать это правильно; давайте посмотрим на мьютексы и каналы.

use std::sync::{Arc, Mutex};
fn main() {
    let shared_data = Arc::new(Mutex::new(
        "AAAAAAAAAAAAAAAAAAAAAAAA!".to_owned()
    ));
    let t1 = {
        let shared_data = shared_data.clone();
        std::thread::spawn(move || {
            let mut data = shared_data.lock().unwrap();
            *data = "Hello, world!\n".to_owned();
        })
    };
    let t2 = {
        let shared_data = shared_data.clone();
        std::thread::spawn(move || {
            let mut data = shared_data.lock().unwrap();
            *data = "Goodbye, world!\n".to_owned();
        })
    };
    t1.join().unwrap();
    t2.join().unwrap();
    println!("{}", shared_data.lock().unwrap());
}

В этом примере мы используем потокобезопасный указатель подсчета ссылок для хранения наших данных, чтобы гарантировать, что наши данные живут достаточно долго (попробуйте использовать однопоточный указатель подсчета ссылок в этом примере, чтобы увидеть, сможем ли мы получить состояние гонки внутри общего указателя), даже если мы решим отсоединить наши потоки. Если бы мы передавали только неизменяемые данные, этого было бы достаточно. Но поскольку мы хотим изменить наше разделяемое состояние, нам нужно обернуть его в один из примитивов синхронизации Rust (см. std::sync::Mutex, parking_lot::Mutex, RwLock и атомарные типы).

use std::{sync::mpsc::channel, thread};
fn main() {
    let (s, r) = channel();
    let (s2, r2) = channel();
    thread::spawn(move || s.send("Hello, ".to_owned()).unwrap());
    thread::spawn(move || s2.send("world!").unwrap());
    let message = r.recv().unwrap() + r2.recv().unwrap();
    println!("{}", message);
}

Вместо этого для передачи данных между потоками мы можем использовать каналы. Эти каналы позволят нам отправлять только те типы, которые безопасны для отправки; если мы попытаемся отправить тип, который небезопасно использовать в потоках (например, Rc), мы получим ошибку компилятора. Для еще более быстрой реализации каналов с участием многих производителей и многих потребителей см. crossbeam_channel.

2. Срок службы и ссылки

#include <string>
#include <thread>
int main() {
    {
        auto data = std::string{"DATA"};
        std::thread([&] { data.push_back('!'); }).detach();
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

В этом примере порождаемый нами поток может обращаться к недопустимой памяти - деструктор данных мог быть вызван к тому моменту, когда он начал доступ к переменной. Нам нужно убедиться, что данные, которые мы используем в наших потоках, переживают сам поток. Во многих различных сценариях (и не только в многопоточных) мы передаем ссылки на данные в C ++, которые должны оставаться в живых, пока пользователь этой ссылки не закончит с ними, и у нас нет автоматического способа проверить, что это так.

fn main() {
    let data = "AAAAAAAAAAAAAAAAAAAAAAAA!".to_owned();
    // error[E0373]: closure may outlive the current function,
    // but it borrows `data`, which is owned by the current function
    std::thread::spawn(|| println!("{}", data));
}

Как мы видели в примере из части 1, компилятор будет жаловаться на то, что заимствованные переменные не живут достаточно долго. Поскольку потоки могут быть отсоединены, ссылки на данные в этих потоках указывают на то, что они должны существовать на протяжении всей программы.

use std::sync::Arc;
fn main() {
    let data = Arc::new("AAAAAAAAAAAAAAAAAAAAAAAA!".to_owned());
    {
        let data = data.clone();
        std::thread::spawn(move || println!("{}", data));
    }
}

Как мы видели в предыдущих примерах, мы можем использовать общие указатели, чтобы гарантировать, что наши данные живут достаточно долго.

use crossbeam_utils::thread;
fn main() {
    let data = "DATA".to_owned();
    thread::scope(|s| {
        let data = &data;
        for c in data.chars() {
            s.spawn(move |_| {
                println!("{}: {}", data, c);
            });
        }
    }).unwrap();
}

Мы также можем использовать потоки с заданной областью действия из crossbeam-utils, которые гарантируют, что они будут соединены до того, как их область действия выйдет за пределы области действия.

3. Обработка ошибок из потоков

#include <exception>
#include <iostream>
#include <stdexcept>
#include <thread>
static std::exception_ptr ep = nullptr;
int main() {
    std::thread([] {
        try {
            throw std::runtime_error("error");
        } catch (...) {
            ep = std::current_exception();
        }
    }).join();
    if (ep) {
        try {
            std::rethrow_exception(ep);
        } catch(std::runtime_error& e) {
            std::cout << e.what() << "\n";
        }
    }
}

В приведенном выше примере показан минимальный код для обработки исключений из одного потока. Я думаю, будет справедливо сказать, что обработка ошибок из потоков в C ++ довольно сложна.

fn main() {
    let thread = std::thread::spawn(|| {
        // ...
        // change the integer here to see the other results
        match 0 {
            0 => Ok("ok"),
            1 => Err("error"),
            _ => panic!("something went wrong"),
        }
    });
    match thread.join() {
        // Ok return, no panic
        Ok(Ok(x)) => println!("OK {}", x),
        // Error return, no panic
        Ok(Err(x)) => println!("Error: {}", x),
        Err(_) => println!("Thread panicked"),
    }
}

В Rust мы можем выбрать обработку паники из потоков (или просто вызвать .unwrap() для завершения, если мы утверждаем, что поток не может паниковать), и мы можем вернуть Result<T, E>, чтобы сигнализировать о том, что поток может выйти из строя. Обратите внимание, что для большинства потоков нам не нужно обрабатывать ни один из них.

4. Присоединение () и отсоединение () потоков

#include <iostream>
#include <thread>
int main() {
    std::thread([] { std::cout << “Hello!”; });
    return 0;
}

В C ++, если вы забудете присоединиться к своим потокам, вы завершите свою программу. Деструктор std::thread вызовет std::terminate, если поток может быть присоединен: если ваш поток все еще работает, когда поток уничтожен.

fn main() {
    std::thread::spawn(|| println!(“Hello!”));
}

В Rust потоки неявно отсоединяются, когда их дескрипторы опускаются, поэтому эту ошибку сделать невозможно. (Обратите внимание, что вы можете не увидеть Hello! В своем стандартном выводе после завершения основного потока.)

Когда я нигде не сохраняю дескриптор потока, я ожидаю, что поток отсоединится, иначе я бы не отбрасывал его. C ++ рассматривает такое интуитивное поведение как неисправимую ошибку времени выполнения, при которой у нас есть возможность сделать ошибочные состояния непредставимыми.

5. Объединение () отсоединенных () потоков.

#include <iostream>
#include <thread>
int main() {
    std::thread t {[] { std::cout << "Hello!"; }};
    t.detach();
    t.join();
    return 0;
}

Попытка присоединиться к отдельному потоку вызывает сбой.

fn main() {
    let thread = std::thread::spawn(|| println!("Hello!"));
    thread.join().unwrap();
    // no detach
}

Этой проблемы не существует, когда потоки отсоединяются, когда мы отбрасываем дескриптор потока.

6. Передача параметров по ссылке.

#include <iostream>
#include <thread>
int main() {
    std::string hello {"Hello!"};
    std::thread {
        [&](const std::string& hi) {
           std::cout << std::boolalpha << (&hi == &hello);
        },
        hello
    }.join();
    return 0;
}

std::thread передает свои аргументы по значению даже в тех случаях, когда вы этого не ожидаете (std::ref исправит это).

fn main() {
    let hello = "Hello!".to_owned();
    std::thread::spawn(move || println!("{}", hello))
        .join()
        .unwrap();
}

В Rust вы не можете передавать аргументы потокам, вместо этого вы заимствуете (с потоками с заданной областью видимости) или перемещаете свои значения в замыкания.

Бонус: возврат вычисленных значений из потоков

Если мы хотим использовать поток в C ++ для правильного возврата значения, нам нужно использовать некоторый дополнительный механизм синхронизации. Я представлю два наиболее очевидных ответа: хранение по ссылке или std::future.

#include <chrono>
#include <future>
#include <thread>
#include <optional>
#include <iostream>
using namespace std::chrono_literals;
// this only works for simple scenarios
void reference_store() {
    auto data = std::optional<std::string>{std::nullopt};
    auto t = std::thread([&] {
        // calculate the meaning of life
        std::this_thread::sleep_for(500ms);
        data = "42";
    });
    t.join();
    std::cout << *data << '\n';
}
void future() {
    auto promise = std::promise<std::string>{};
    auto future = promise.get_future();
    auto t = std::thread([](auto promise) {
        // calculate the meaning of life
        std::this_thread::sleep_for(500ms);
        promise.set_value("42");
    }, std::move(promise));
    std::cout << future.get() << '\n';
    t.join();
}
int main() {
    reference_store();
    future();
}

Потоки Rust предоставляют механизм для возврата значений напрямую из них, что делает эту операцию встроенной в потоки.

fn main() {
    let thread = std::thread::spawn(|| {
        // calculate the meaning of life
        std::thread::sleep(std::time::Duration::from_millis(500));
        "42".to_owned()
    });
    let result = thread.join().unwrap();
    println!("{}", result);
}