wnw231423's blog

Reason the world.

Rust to modern C++ 04: Concurrency && Asynchronous Start

发布于 # learnings # C++

Basic

在Rust中, 我们通过thread::spawn()创建一个JoinHandle对象, 并使用join()来汇入.

fn main() {
	let handle = thread::spawn(|| {...});
	// -snip
	handle.join().unwarp();
}

在C++中, 类似地, 我们创建一个std::thread对象. 区别于Rust, 由于离开作用域后, std::thread的析构函数会调用std::terminate(), 所以我们必须在销毁前决定是join还是detach. 而在Rust中, detach是隐式的.

void f() {...}

int main() {
	auto my_thread = std:thread(f);
	my_thread.detach();  // or join
}

需要注意因为异常导致生命周期的问题, 可能需要在异常中也调用join()或者利用RAII. 参考CCIA 2.1.3.

创建std::thread时第一个参数是一个函数, 接下来的参数可以是要传给函数的参数. thread的构造器会拷贝提供的变量, 但内部的代码会把拷贝的参数以右值为实参调用函数, 所以如果函数接受T&, 考虑使用std::ref将参数转换成引用形式.

另外, std::threadstd::unique_ptr类似, 是可移动不可复制的.

在并发编程中有两种基本模型, 分别是共享内存式的模型和消息传递式的模式.

Shared memory

在Rust中, 对于共享数据, 用std::sync::Mutex<T>来保护共享数据. Rust中数据和锁是一体的, 为了让多个线程共享所有权, 则需要使用std::sync::Arc<T>.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
	let counter = Arc::new(Mutex::new(0));
	let mut handles = vec![];

	for _ in 0..10 {
		let counter = Arc::clone(&counter);
		let handle = thread::spawn(move || {
			let mut num = counter.lock().unwrap();
			*num += 1;
		})
		handles.push(handle);
	}

	for handle in handles {
		handle.join().unwrap();
	}
}

数据和锁一体是比较”高级”的概念. 最早人们在C语言中手动实现的锁和数据是分离的. 到了C++11, 添加了对并发模型的支持, 不过C++中的锁和数据还是分离的.

C++通过std::mutex创建一个互斥量(我在后面就为了方便以锁来指称), 并通过构造std::lock_guard<T>来, 利用RAII来上锁和解锁:

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value) {
	std::lock_guard<std::mutex> guard(some_mutex);
	some_list.push_back(new_value);
}

bool list_contains(int value_to_find) {
	std::lock_guard<std::mutex> guard(some_mutex); // 4
	return std::find(some_list.begin(),some_list.end(),value_to_find) !=
some_list.end();
}

race condition between APIs

尽管实现了对共享数据的互斥访问, 但事情并没有结束. 举个例子, 对于一个栈, 我们先top()查看栈顶元素, 然后pop()弹出栈顶元素, 这个过程中可能会有其他的线程做push()操作导致结果不一致. 为了实现一个线程安全的数据结构, 一个办法是为整个数据结构设置一个全局互斥量, 来保护操作的原子性. 此时又可能会需要考虑锁的粒度大小的问题.

Deadlock && std::scoped_lock<T>

死锁问题就不再赘述, 在C++17中, 引入了std::scoped_lock<T> RAII模板类型, 和std::lock_guard<T>功能相同, 不过可以获取多个锁.

std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
// C++17可以自动推导模板参数,可简写为:
std::scoped_lock guard(lhs.m,rhs.m);

std::unique_lock<T>

简单来说, std::unique_lock提供了更多的功能, 例如将std::defer_lock作为第二个参数, 则可以实现延迟上锁:

std::unique_lock<std::mutex> lk(m, std::defer_lock);
// 做其他工作
lk.lock(); // 延迟上锁

除此之外还有std::adopt_lock, std::try_to_lock等. 还可以中途解锁unlock()再加锁lock().

std::unique_lock可以用于不同域之间互斥量的传递, 它是一个可移动但不可复制的类型.

std::shared_mutexandstd::shared_lock<T>

std::shared_mutex是一个允许共享读, 互斥写的互斥量, 适用于读操作频繁, 写操作很少的场景. 通过std::unique_lock<T>获得写锁, 通过std::shared_lock<T>获得读锁:

#include <shared_mutex>
#include <mutex>

std::shared_mutex rw_mutex;
int shared_data = 0;

void write_data() {
    // 获取独占锁(写锁),阻塞其他所有读者和写者
    std::unique_lock<std::shared_mutex> lock(rw_mutex); 
    // 或者使用 std::lock_guard<std::shared_mutex> lock(rw_mutex);
    
    shared_data = 42; 
    // 离开作用域时自动释放写锁
}

void read_data() {
    // 获取共享锁(读锁),允许其他读者同时进入,但阻塞写者
    std::shared_lock<std::shared_mutex> lock(rw_mutex);
    
    int temp = shared_data;
    // 离开作用域时自动释放读锁
}

std::shared_mutex对应Rust中的std::sync::RwLock<T>, 调用.read().lock()分别获得读/写锁.

同步

同步简单来说就是, 当多个任务在跑时, 某些地方需要有先后顺序, 例如厨房中有人煮饭有人准备食材有人炒菜, 但炒菜必须等待准备食材准备好一道菜的食材, 才能开始炒.

std::condition_variable

条件变量, 或者睡眠锁. 在等待时需要释放资源对应的锁, 所以需要搭配std::unique_lock<T>使用:

std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

// tx
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();

// rx
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(
	lk, [] {return !data_queue.empty();});
auto data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);

在Rust中对应std::sync::CondVar.

Future

Rust中的Future trait和Typescript中的Promise<T>, 在C++中则为future, 包括std::future<>std::shared_future<>.

std::async()函数会启动一个异步任务, 并返回一个std::future对象, 使用的方法和std::thread()类似.

std::async()允许传递一个额外参数std::launch::async保证新开一个线程; std::launch::defered表明惰性求值. 默认是std::launch::defered | std::launch::async, 表示可以选择两种方式的一种.

auto f = std::async(std::launch::async, f, 42);
auto x = f.get();

Rust中的Future是惰性求值的.

std::future是只移动地, 独享结果, 通过调用get()一次性地获取数据. std::shared_future则是可拷贝的.

std::packaged_task<>

std::packaged_task<>简单来理解就是一个任务, 通过get_future()方法你可以获得这个任务相应的future. 任务可以被调用或者传给其他线程, 从而开始执行任务. 任务会自动向future填值或者抛出异常, 类似于Typescript中Promise<T>的fulfill/reject.

void gui_thread() {
	while(!gui_shutdown()) {
		std::packaged_task<void()> task;
		{
			std::lock_guard lk(m);
			if (!tasks.empty()) {
				task = std::move(task.front());
				tasks.pop_front();
			}
		}
		task();
	}
}

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
	std::packaged_task<void()> task(f);
	std::future<void> res = task.get_future();
	std::lock_guard lk(m);
	tasks.push_back(std::move(task));
	return res;
}

std::promises<T>

std::promises<T>std::future<T>是相关联的, future可以阻塞等待进程, 而promise则用于让提供数据的进程对相关的值进行设置, 并令future就绪. 通过set_value()方法设置值, 如果出现异常, 则通过set_exception().

extern std::promise<double> some_promise;
try {
	some_promise.set_value(calculate_value());
} catch(...) {
	some_promise.set_exception(std::current_exception());
}

总结

借助了大模型, 主要是对Rust, C++, Typescript三者做一下对比.

1) “执行单元”:Thread / Task / Future

C++

Rust

TypeScript(JS)


2) “共享内存互斥”:mutex / lock / guard

C++

Rust

TypeScript


3) “消息传递/异步结果”:future / promise / packaged_task

C++:future/promise/packaged_task

Rust:Future(async) + channels(消息)

TS:Promise


4) 三者最关键的“默认语义差异”(容易混淆的点)

  1. C++ std::async 默认不保证开线程
    默认是 launch::async | launch::deferred,实现可选 deferred;你以为并行了,可能没有。
  2. Rust Future 默认不执行(惰性)
    async fn 返回 future,除非 .await 或被 executor poll,否则不跑;这点跟很多人对 TS Promise “创建即开始” 的直觉不同。
  3. TS Promise 的强项是“组合”,不是“并行”
    Promise.all 组合的是异步 I/O 任务的完成;CPU 并行不靠它。

后记

消息传递模型没写, C++没有类似Rust中mpsc的标准库的东西, 应该要自己手写或者找别的轮子. 主要还是做了下基础的梳理, 方便后续实践做点项目, 可能有写错的地方, 如果有欢迎批评指正.