В этот раз реализую простейший примитив
синхронизации двух короутин. Его назначение довольно просто: одна
короутина приостанавливается на ожидании сигнала, другая этот сигнал
отправляет. По сути это аналог
пары std::future<void>
/std::promise<void>
.
Короутины могут выполняться в разных потоках, ожидать сигнала может
только одна короутина.
#include "event.h"
;
Event event
auto consumer = [&]() -> awaitable<void> {
std::cout << "Waiting... ";
boost::asio::any_io_executor executor = co_await boost::asio::this_coro::executor;
.wait(executor);
co_wait eventstd::cout << "event received, consumer is done." << std::endl;
co_return;
}
auto producer = [&]() -> awaitable<void> {
std::cout << "Send event" << std::endl;
.set();
eventco_return;
}
auto main = []() -> awaitable<void> {
co_await(
() &&
process()
check);
co_return;
};
;
io_context ioContext(ioContext, main(), detached);
co_spawn.run(); ioContext
Реализован он так:
class Event
{
enum class State { not_set, not_set_consumer_waiting, set };
std::atomic<State> _state;
std::move_only_function<void()> _handler;
public:
() : _state{State::not_set} {}
Event
boost::asio::awaitable<void> wait(boost::asio::any_io_executor executor) {
auto initiate = [this, executor]<typename Handler>(Handler&& handler) mutable
{
this->_handler = [executor, handler = std::forward<Handler>(handler)]() mutable {
boost::asio::post(executor, std::move(handler));
};
= State::not_set;
State oldState const bool isWaiting = _state.compare_exchange_strong(
,
oldState::not_set_consumer_waiting,
Statestd::memory_order_release,
std::memory_order_acquire);
if (!isWaiting) {
this->_handler();
}
};
return boost::asio::async_initiate<
decltype(boost::asio::use_awaitable), void()>(
, boost::asio::use_awaitable);
initiate}
void set() {
const State oldState = _state.exchange(State::set, std::memory_order_acq_rel);
if (oldState == State::not_set_consumer_waiting) {
();
_handler}
}
};
Механизм межпоточной инхронизации (строки 18 и 35) взят из реализации cppcoro, пробуждение реализовано аналогично функции schedule, отправкой функтора на исполнение в экзекьютер. Особенность этой реализации в том, что она всегда приостанавливает короутину-подписчек, даже если к моменту вызова метода wait другой поток уже переключил состояние эвента методом set. Аналогично этому в Boost работают асинхронные операции:
Regardless of whether the asynchronous operation completes immediately or not, the completion handler will not be invoked from within this function. On immediate completion, invocation of the handler will be performed in a manner equivalent to using post.
Это конечно не самая оптимальная реализация, но тем не мение ее можно использовать для реализации sequence barrier и в дальнейшем кольцевого буфера.
#include "event.h"
#include "schedule.h"
#include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/test/unit_test.hpp>
BOOST_AUTO_TEST_CASE(test_Event)
{
bool reachedPointA = false;
bool reachedPointB = false;
;
Event event
auto consumer = [&]() -> boost::asio::awaitable<void> {
= true;
reachedPointA
boost::asio::any_io_executor executor = co_await boost::asio::this_coro::executor;
co_await event.wait(executor);
= true;
reachedPointB co_return;
};
auto producer = [&]() -> boost::asio::awaitable<void> {
BOOST_TEST(reachedPointA);
BOOST_TEST(!reachedPointB);
boost::asio::any_io_executor executor = co_await boost::asio::this_coro::executor;
co_await schedule(executor);
BOOST_TEST(reachedPointA);
BOOST_TEST(!reachedPointB);
.set();
eventco_return;
};
auto main = [&]() -> boost::asio::awaitable<void> {
using namespace boost::asio::experimental::awaitable_operators;
co_await(consumer() && producer());
co_return;
};
boost::asio::io_context ioContext;
boost::asio::co_spawn(ioContext, main(), boost::asio::detached);
.run();
ioContext
BOOST_TEST(reachedPointA);
BOOST_TEST(reachedPointB);
}
BOOST_AUTO_TEST_CASE(test_Event_set_before_wait)
{
bool reachedPointA = false;
bool reachedPointB = false;
;
Event event
.set();
event
auto consumer = [&]() -> boost::asio::awaitable<void> {
= true;
reachedPointA
boost::asio::any_io_executor executor = co_await boost::asio::this_coro::executor;
co_await event.wait(executor);
= true;
reachedPointB co_return;
};
auto producer = [&]() -> boost::asio::awaitable<void> {
BOOST_TEST(reachedPointA);
BOOST_TEST(!reachedPointB);
co_return;
};
auto main = [&]() -> boost::asio::awaitable<void> {
using namespace boost::asio::experimental::awaitable_operators;
co_await(consumer() && producer());
co_return;
};
boost::asio::io_context ioContext;
boost::asio::co_spawn(ioContext, main(), boost::asio::detached);
.run();
ioContext
BOOST_TEST(reachedPointA);
BOOST_TEST(reachedPointB);
}
Код тут.
У этой статьи есть продолжение.
August 2022