introduction
Cet article est une suite de cet article .
Flux de données sans fin avec co_yield
Le code ci-dessous implémente un flux de données sans fin. La coroutine est getNextutilisée co_yieldpour créer un flux de données qui commence par startet émet chaque nouvelle valeur avec une étape sur demande step.
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>
template <typename T>
struct Generator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
Generator(handle_type h) : coro(h) {} // (3)
handle_type coro;
std::shared_ptr<T> value;
~Generator() {
if (coro) {
coro.destroy();
}
}
Generator(const Generator &) = delete;
Generator& operator=(const Generator &) = delete;
Generator(Generator &&other) : coro(other.coro) {
other.coro = nullptr;
}
Generator& operator=(Generator &&other) {
coro = other.coro;
other.coro = nullptr;
return *this;
}
T getValue() {
return coro.promise().current_value;
}
bool next() { // (5)
coro.resume();
return not coro.done();
}
struct promise_type {
promise_type() = default; // (1)
~promise_type() = default;
auto initial_suspend() { // (4)
return std::suspend_always{};
}
auto final_suspend() {
return std::suspend_always{};
}
auto get_return_object() { // (2)
return Generator{handle_type::from_promise(*this)};
}
auto return_void() {
return std::suspend_never{};
}
auto yield_value(T value) { // (6)
current_value = value;
return std::suspend_always{};
}
void unhandled_exception() {
std::exit(1);
}
T current_value;
};
};
Generator <int> getNext(int start = 0, int step = 1) {
auto value = start;
for (int i = 0; ; ++i) {
co_yield value;
value += step;
}
}
int main() {
std::cout << "getNext():";
auto gen = getNext();
for (int i = 0; i <= 10; ++i) {
gen.next();
std::cout << " " << gen.getValue(); // (7)
}
std::cout << "\ngetNext(100, -10):";
auto gen2 = getNext(100, -10);
for (int i = 0; i <= 20; ++i) {
gen2.next();
std::cout << " " << gen2.getValue();
}
std::cout << std::endl;
}Note du traducteur: l'assemblage a été réalisé par la commandeg++ -fcoroutines infiniteDataStream.cpp
Dans la fonction main, 2 coroutines sont créées. Le premier gen,, renvoie des valeurs de 0 à 10. Le second ,, gen2- de 100 à -100 par incréments de 10. Sortie programme:
$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100
Les étiquettes avec des nombres dans les commentaires dans le programme infiniteDataStream.cppdécrivent la première itération dans l'ordre suivant:
- Créer un objet de promesse
- Appel
promise.get_return_object()et stockage du résultat dans une variable locale - Création de générateur
- Appel
promise.initial_suspend(), parce que le générateur est "paresseux", doncsuspend_always - Demander la valeur suivante et renvoyer un drapeau si le générateur est épuisé
- Action activée
co_yield, après quoi la valeur suivante sera disponible - Obtenir la valeur suivante
Dans les itérations suivantes, seules les étapes 5 et 6 sont effectuées.
Synchroniser les flux à l'aide co_await
co_await. , — . (condition variables), promises futures, -. , (spurious wakeups) (lost wakeups).
// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>
class Event {
public:
Event() = default;
Event(const Event &) = delete;
Event(Event &&) = delete;
Event& operator=(const Event &) = delete;
Event& operator=(Event &&) = delete;
class Awaiter;
Awaiter operator co_await() const;
void notify();
private:
friend class Awaiter;
mutable std::atomic<void *> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};
};
class Event::Awaiter {
public:
Awaiter(const Event &e) : event(e) {}
bool await_ready() const;
bool await_suspend(std::coroutine_handle<> ch);
void await_resume() {}
private:
friend class Event;
const Event &event;
std::coroutine_handle<> coroutineHandle;
};
bool Event::Awaiter::await_ready() const {
if (event.suspendedWaiter.load() != nullptr) {
throw std::runtime_error("More than one waiter is not valid");
}
return event.notified; // true - , false -
}
bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
coroutineHandle = ch;
if (event.notified) {
return false;
}
// waiter
event.suspendedWaiter.store(this);
return true;
}
void Event::notify() {
notified = true;
// waiter
auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
// waiter
if (waiter != nullptr) {
//
waiter->coroutineHandle.resume();
}
}
Event::Awaiter Event::operator co_await() const {
return Awaiter{*this};
}
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task receiver(Event &event) {
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification!" << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}
int main() {
std::cout << "Notification before waiting" << std::endl;
Event event1{};
auto senderThread1 = std::thread([&event1] { event1.notify(); });
auto receiverThread1 = std::thread(receiver, std::ref(event1));
receiverThread1.join();
senderThread1.join();
std::cout << "\nNotification after 2 seconds waiting" << std::endl;
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));
auto senderThread2 = std::thread([&event2] {
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
event2.notify();
});
receiverThread2.join();
senderThread2.join();
} : g++ -pthread -fcoroutines senderReceiver.cpp
, . , senderReceiver.cpp senderThread1 senderThread2 event (eventN.notify()). receiver , receiverThread1 receiverThread2. , . .
senderReceiver
$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.
Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.
Generator Event , . , Generator awaitable awaiter; Event operator co_await awaiter. awaitable awaiter .
, , 2 . , event1 , , event2 , 2 .
senderReceiver.cpp . Event : suspendedWaiter notified. waiter , .
, event1 receiverThread1 . even1.notify() notified waiter. waiter nullptr .. , , waiter->coroutineHandle.resume() . await_ready waiter , , std::runtime_error. . notified true notify, , , .
event2 co_await event , . await_ready. , event.notified false . , await_suspend handle ch corotineHandle. , , . , waiter suspendedWaiter. event2.notify notify. , waiter nullptr. waiterutilise des coroutineHandlecoroutines pour reprendre le travail.