Skip to main content

GraphRun.h File

GraphRun — runtime handle for a compiled actor-style graph. More...

Included Headers

#include "graph/Graph.h" #include "pipeline/Run.h" #include <cstdint> #include <chrono> #include <functional> #include <memory> #include <mutex> #include <optional> #include <string> #include <unordered_map> #include <unordered_set> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph

Classes Index

structGraphRunOptions

Tuning knobs for a GraphRun — queue capacities, push/pull timeouts, verbosity. More...

structGraphRunStats

Per-node, per-stream telemetry collector for a running GraphRun. More...

structStreamStat

Per-stream counters within a single node. More...

structNodeStat

Aggregated counters for a single node, plus per-stream breakdown. More...

structSnapshot

Lock-free snapshot of a single node's stats — safe to copy/return. More...

structGraphRunPullOptions

Tuning knobs for GraphRun::pull_until() / PullSession::run(). More...

classGraphRun

Live runtime handle for a compiled Graph. More...

classInput

Push handle for a node's input port. More...

classOutput

Pull handle for a node's output stream. More...

classStallGuard

Tracks progress against a per-stream target and a stall deadline. More...

classPullSession

Fluent builder for the common "collect samples until target/stall" workflow. More...

Description

GraphRun — runtime handle for a compiled actor-style graph.

GraphRun is the runtime-graph counterpart of pipeline::Run: a live, running graph the caller pushes inputs into and pulls outputs out of. Unlike pipeline::Run, GraphRun uses named ports, and exposes per-node Input / Output handles plus an opinionated PullSession for the common collect-until-stall workflow.

See Also

GraphSession to build one

See Also

pipeline::Run for the linear-pipeline counterpart

File Listing

The file content with the documentation metadata removed is:

1
14#pragma once
15
16#include "graph/Graph.h"
17#include "pipeline/Run.h"
18
19#include <cstdint>
20#include <chrono>
21#include <functional>
22#include <memory>
23#include <mutex>
24#include <optional>
25#include <string>
26#include <unordered_map>
27#include <unordered_set>
28#include <vector>
29
30namespace simaai::neat::graph {
31
37 std::size_t edge_queue =
38 256;
40 5000;
41 int pull_timeout_ms = 50;
45
52 GraphRunOptions& enable_board_power(int sample_interval_ms = 100) {
53 power_monitor = board_power_monitor_options(sample_interval_ms);
54 return *this;
55 }
56
62 GraphRunOptions& enable_modalix_som_power(int sample_interval_ms = 100) {
64 return *this;
65 }
66
70 GraphRunOptions& enable_modalix_dvt_power(int sample_interval_ms = 100) {
72 return *this;
73 }
74
78 return *this;
79 }
80};
81
92 struct StreamStat {
93 int64_t count = 0;
94 std::chrono::steady_clock::time_point first{};
95 std::chrono::steady_clock::time_point last{};
96 bool initialized = false;
97 };
98
100 struct NodeStat {
101 int64_t total = 0;
102 std::chrono::steady_clock::time_point first{};
103 std::chrono::steady_clock::time_point last{};
104 bool initialized = false;
105 std::unordered_map<std::string, StreamStat> streams;
106 };
107
109 struct Snapshot {
110 NodeId node_id = kInvalidNode;
111 int64_t total = 0;
112 std::chrono::steady_clock::time_point first{};
113 std::chrono::steady_clock::time_point last{};
114 std::unordered_map<std::string, int64_t> counts;
115 std::unordered_map<std::string, std::chrono::steady_clock::time_point>
117 };
118
120 void record(NodeId node_id, const Sample& sample);
122 std::vector<Snapshot> snapshot() const;
124 bool empty() const;
126 std::unordered_map<std::string, int64_t>
127 stream_counts(const std::vector<NodeId>& nodes = {}) const;
129 bool has_missing_streams(const std::unordered_set<std::string>& expected,
130 const std::vector<NodeId>& nodes = {}) const;
132 std::string missing_streams_list(const std::unordered_set<std::string>& expected,
133 const std::vector<NodeId>& nodes = {}) const;
134
135private:
136 mutable std::mutex mu_;
137 std::unordered_map<NodeId, NodeStat> nodes_;
138};
139
149 0;
150 int stall_ms = 0;
151 int timeout_ms = 50;
152 int max_runtime_ms = -1;
153 std::vector<std::string>
155};
156
166class GraphRun {
167public:
174 class Input {
175 public:
177 bool push(const Sample& sample) const;
178
179 private:
180 friend class GraphRun;
181 Input(GraphRun* run, NodeId node, PortId port, bool has_port)
182 : run_(run), node_(node), port_(port), has_port_(has_port) {}
183
184 GraphRun* run_ = nullptr;
185 NodeId node_ = kInvalidNode;
186 PortId port_ = kInvalidPort;
187 bool has_port_ = false;
188 };
189
196 class Output {
197 public:
199 std::optional<Sample> pull(int timeout_ms = -1, GraphRunStats* stats = nullptr) const;
201 Sample pull_or_throw(int timeout_ms = -1, GraphRunStats* stats = nullptr) const;
203 NodeId node_id() const {
204 return node_;
205 }
206
207 private:
208 friend class GraphRun;
209 Output(GraphRun* run, NodeId node) : run_(run), node_(node) {}
210
211 GraphRun* run_ = nullptr;
212 NodeId node_ = kInvalidNode;
213 };
214
221 class StallGuard {
222 public:
226 bool done() const {
227 return done_;
228 }
230 bool stalled() const {
231 return stalled_;
232 }
234 int64_t target_progress() const {
235 return target_progress_;
236 }
237
238 private:
239 friend class GraphRun;
240 StallGuard(std::vector<NodeId> nodes, std::vector<std::string> streams, int per_stream_target,
241 int stall_ms);
242
243 std::vector<NodeId> nodes_;
244 std::vector<std::string> streams_;
245 int per_stream_target_ = 0;
246 int stall_ms_ = 0;
247 bool initialized_ = false;
248 bool done_ = false;
249 bool stalled_ = false;
250 int64_t target_progress_ = 0;
251 std::chrono::steady_clock::time_point last_progress{};
252 };
253
270 public:
280 PullSession& expect_streams(std::vector<std::string> ids);
282 PullSession& on_sample(std::function<void(const Sample&, NodeId)> cb);
286 void run();
287
288 private:
289 friend class GraphRun;
290 PullSession(GraphRun* run, const std::vector<Output>* outputs, std::vector<NodeId> output_nodes,
292
293 GraphRun* run_ = nullptr;
294 const std::vector<Output>* outputs_ = nullptr;
295 std::vector<NodeId> output_nodes_;
296 GraphRunStats* stats_ = nullptr;
298 std::unordered_set<std::string> expected_;
299 std::unordered_set<std::string> unknown_;
300 bool saw_empty_stream_id_ = false;
301 std::function<void(const Sample&, NodeId)> on_sample_;
302 };
303
305 GraphRun() = default;
307 GraphRun(const GraphRun&) = delete;
309 GraphRun& operator=(const GraphRun&) = delete;
310
312 GraphRun(GraphRun&&) noexcept;
314 GraphRun& operator=(GraphRun&&) noexcept;
317
319 explicit operator bool() const noexcept;
321 bool running() const;
322
324 bool push(NodeId node_id, const Sample& sample);
326 bool push(NodeId node_id, PortId port, const Sample& sample);
327
329 std::optional<Sample> pull(NodeId node_id, int timeout_ms = -1);
330
332 Input input(NodeId node_id);
334 Input input(NodeId node_id, PortId port);
337
341 const GraphRunStats* stats() const;
343 PullSession collect(const std::vector<Output>& outputs, GraphRunStats* stats = nullptr);
344
346 std::optional<Sample> pull_any(const std::vector<Output>& outputs, int timeout_ms = -1,
347 GraphRunStats* stats = nullptr, NodeId* out_node = nullptr);
349 bool warmup(const std::vector<Output>& outputs, int warmup_count, int timeout_ms = -1);
351 void pull_until(const std::vector<Output>& outputs, GraphRunStats& stats,
352 const GraphRunPullOptions& opt,
353 const std::function<void(const Sample&, NodeId)>& on_sample = {});
355 StallGuard stall_guard(const std::vector<Output>& outputs, int per_stream_target, int stall_ms,
356 std::vector<std::string> stream_ids = {});
360 void emit_rate_summary() const;
364 void emit_stream_summary() const;
366 void emit_summary(const GraphRunStats& stats) const;
368 void emit_summary() const;
372 std::string metrics_report(const RuntimeMetricsOptions& opt = {},
375 std::string metrics_report(RuntimeMetricsFormat format) const;
376
378 std::string describe() const;
379
381 void stop();
383 std::string last_error() const;
385 void last_error_or_throw() const;
386
387private:
388 struct State;
389 std::shared_ptr<State> state_;
390
391 explicit GraphRun(std::shared_ptr<State> state);
392 friend class GraphSession;
393};
394
395} // namespace simaai::neat::graph

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.