Skip to main content

BlockingQueue.h File

Simple bounded blocking queue for graph runtime. More...

Included Headers

#include <chrono> #include <condition_variable> #include <cstddef> #include <deque> #include <mutex> #include <utility>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespaceruntime

Classes Index

classBlockingQueue<T>

Thread-safe bounded blocking queue used by the runtime mailboxes. More...

Description

Simple bounded blocking queue for graph runtime.

File Listing

The file content with the documentation metadata removed is:

1
6#pragma once
7
8#include <chrono>
9#include <condition_variable>
10#include <cstddef>
11#include <deque>
12#include <mutex>
13#include <utility>
14
16
29template <class T> class BlockingQueue {
30public:
32 explicit BlockingQueue(std::size_t capacity = 0) : capacity_(capacity) {}
33
36 bool push(const T& item, int timeout_ms = -1) {
37 std::unique_lock<std::mutex> lock(mu_);
38 if (closed_)
39 return false;
40 if (capacity_ > 0) {
41 if (timeout_ms < 0) {
42 cv_not_full_.wait(lock, [&] { return closed_ || queue_.size() < capacity_; });
43 } else if (!cv_not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
44 [&] { return closed_ || queue_.size() < capacity_; })) {
45 return false;
46 }
47 if (closed_ || (capacity_ > 0 && queue_.size() >= capacity_))
48 return false;
49 }
50 queue_.push_back(item);
51 cv_not_empty_.notify_one();
52 return true;
53 }
54
57 bool push(T&& item, int timeout_ms = -1) {
58 std::unique_lock<std::mutex> lock(mu_);
59 if (closed_)
60 return false;
61 if (capacity_ > 0) {
62 if (timeout_ms < 0) {
63 cv_not_full_.wait(lock, [&] { return closed_ || queue_.size() < capacity_; });
64 } else if (!cv_not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
65 [&] { return closed_ || queue_.size() < capacity_; })) {
66 return false;
67 }
68 if (closed_ || (capacity_ > 0 && queue_.size() >= capacity_))
69 return false;
70 }
71 queue_.push_back(std::move(item));
72 cv_not_empty_.notify_one();
73 return true;
74 }
75
77 bool try_push(const T& item) {
78 std::lock_guard<std::mutex> lock(mu_);
79 if (closed_)
80 return false;
81 if (capacity_ > 0 && queue_.size() >= capacity_)
82 return false;
83 queue_.push_back(item);
84 cv_not_empty_.notify_one();
85 return true;
86 }
87
89 bool try_push(T&& item) {
90 std::lock_guard<std::mutex> lock(mu_);
91 if (closed_)
92 return false;
93 if (capacity_ > 0 && queue_.size() >= capacity_)
94 return false;
95 queue_.push_back(std::move(item));
96 cv_not_empty_.notify_one();
97 return true;
98 }
99
102 bool pop(T& out, int timeout_ms = -1) {
103 std::unique_lock<std::mutex> lock(mu_);
104 if (timeout_ms < 0) {
105 cv_not_empty_.wait(lock, [&] { return closed_ || !queue_.empty(); });
106 } else {
107 cv_not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
108 [&] { return closed_ || !queue_.empty(); });
109 }
110 if (queue_.empty())
111 return false;
112 out = std::move(queue_.front());
113 queue_.pop_front();
114 cv_not_full_.notify_one();
115 return true;
116 }
117
119 void close() {
120 std::lock_guard<std::mutex> lock(mu_);
121 closed_ = true;
122 cv_not_empty_.notify_all();
123 cv_not_full_.notify_all();
124 }
125
127 bool closed() const {
128 std::lock_guard<std::mutex> lock(mu_);
129 return closed_;
130 }
131
133 std::size_t size() const {
134 std::lock_guard<std::mutex> lock(mu_);
135 return queue_.size();
136 }
137
138private:
139 mutable std::mutex mu_;
140 std::condition_variable cv_not_empty_;
141 std::condition_variable cv_not_full_;
142 std::deque<T> queue_;
143 std::size_t capacity_ = 0;
144 bool closed_ = false;
145};
146
147} // namespace simaai::neat::graph::runtime

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.