Skip to main content

StrictSync.h File

Strict multi-stream synchronization helpers (pending frame store, token store, release pacer). More...

Included Headers

#include "pipeline/SessionOptions.h" #include "pipeline/Run.h" #include <atomic> #include <condition_variable> #include <cstddef> #include <cstdint> #include <deque> #include <functional> #include <memory> #include <mutex> #include <optional> #include <thread> #include <unordered_map> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespacestrict_sync

Classes Index

classPendingVideoStore

Per-stream store of video frames awaiting their matching detection metadata. More...

structPendingFrame

Frame held in the store while awaiting a matching metadata token. More...

structStreamStats

Per-stream counters tracking pending depth, hits, and misses. More...

structStreamState

Per-stream internal state: pending frames keyed by frame_id plus accounting. More...

classYoloTokenStore

Per-stream queue of yolo (or other detection) tokens awaiting a matching frame. More...

structToken

Per-stream token: a frame_id plus the time it was enqueued. More...

structOrderedToken

Globally-ordered token: same as Token plus the originating stream index. More...

structStats

Per-stream counters tracking depth, hits, and misses. More...

structState

Per-stream internal state: token queue plus counters. More...

classReleasePacer

Paces downstream releases of paired samples to a target frame rate. More...

structStats

Per-stream counters tracking sent/dropped/fail counts and queue depth. More...

structState

Per-stream internal state: queue, worker thread, and live counters. More...

Description

Strict multi-stream synchronization helpers (pending frame store, token store, release pacer).

File Listing

The file content with the documentation metadata removed is:

1
7#pragma once
8
10#include "pipeline/Run.h"
11
12#include <atomic>
13#include <condition_variable>
14#include <cstddef>
15#include <cstdint>
16#include <deque>
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <optional>
21#include <thread>
22#include <unordered_map>
23#include <vector>
24
26
39public:
41 struct PendingFrame {
43 int64_t cap_ms = -1;
44 size_t bytes = 0;
45 };
46
48 struct StreamStats {
49 int64_t enqueued = 0;
50 int64_t matched = 0;
51 int64_t miss = 0;
52 size_t pending_depth = 0;
53 size_t pending_bytes = 0;
54 size_t max_pending_depth = 0;
55 size_t max_pending_bytes = 0;
56 };
57
59 explicit PendingVideoStore(size_t streams);
60
62 bool enqueue(size_t idx, int64_t frame_id, simaai::neat::Sample&& sample, int64_t cap_ms,
63 size_t bytes);
64
66 std::optional<PendingFrame> take(size_t idx, int64_t frame_id);
67
69 StreamStats stats(size_t idx) const;
70
71private:
73 struct StreamState {
74 mutable std::mutex mu;
75 std::unordered_map<int64_t, PendingFrame> pending;
76 std::deque<int64_t> order;
77 size_t bytes_total = 0;
78 StreamStats stats;
79 };
80
81 std::vector<StreamState> states_;
82};
83
95public:
97 struct Token {
98 int64_t frame_id = -1;
99 int64_t enqueued_ms = 0;
100 };
101
103 struct OrderedToken {
104 size_t stream_idx = 0;
105 int64_t frame_id = -1;
106 int64_t enqueued_ms = 0;
107 };
108
110 struct Stats {
111 int64_t enqueued = 0;
112 int64_t dequeued = 0;
113 int64_t miss = 0;
114 size_t depth = 0;
115 size_t max_depth = 0;
116 };
117
119 explicit YoloTokenStore(size_t streams);
120
122 void enqueue(size_t idx, int64_t frame_id);
124 std::optional<OrderedToken> take_ordered();
126 std::optional<Token> take(size_t idx);
128 Stats stats(size_t idx) const;
129
130private:
132 struct State {
133 mutable std::mutex mu;
134 std::deque<Token> q;
135 Stats stats;
136 };
137
138 static int64_t now_ms_i64();
139
140 mutable std::mutex order_mu_;
141 std::deque<OrderedToken> order_q_;
142 std::vector<State> states_;
143};
144
157public:
159 struct Stats {
160 int64_t enqueued = 0;
161 int64_t sent = 0;
162 int64_t dropped = 0;
163 int64_t send_fail = 0;
164 int64_t max_queue_depth = 0;
165 };
166
168 using OnSendResult = std::function<void(size_t /*stream_idx*/, bool /*ok*/)>;
170 using OnDrop = std::function<void(size_t /*stream_idx*/, int64_t /*dropped_count*/)>;
171
173 ReleasePacer(const std::vector<std::shared_ptr<simaai::neat::Run>>& runs, int target_fps,
174 size_t max_queue, OnSendResult on_send_result = {}, OnDrop on_drop = {});
177
179 bool enabled() const {
180 return interval_ms_ > 0;
181 }
183 int64_t interval_ms() const {
184 return interval_ms_;
185 }
187 size_t max_queue() const {
188 return max_queue_;
189 }
190
192 bool enqueue(size_t idx, simaai::neat::Sample&& sample);
194 void stop();
196 Stats stats(size_t idx) const;
197
198private:
200 struct State {
201 mutable std::mutex mu;
202 std::condition_variable cv;
203 std::deque<simaai::neat::Sample> queue;
204 std::thread worker;
205 bool stop = false;
206
207 int64_t next_release_ms = -1;
208 int64_t enqueued = 0;
209 int64_t sent = 0;
210 int64_t dropped = 0;
211 int64_t send_fail = 0;
212 int64_t max_queue_depth = 0;
213 };
214
215 static int64_t now_ms_i64();
216 void worker_loop(size_t idx);
217
218 std::vector<std::shared_ptr<simaai::neat::Run>> runs_;
219 std::vector<std::unique_ptr<State>> states_;
220 OnSendResult on_send_result_;
221 OnDrop on_drop_;
222
223 int64_t interval_ms_ = 0;
224 size_t max_queue_ = 0;
225 std::atomic<bool> stopped_{false};
226};
227
228} // namespace simaai::neat::graph::strict_sync

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.