Skip to main content

Run.h File

Run — the live pipeline returned by Session::build, plus runtime options and diagnostics. More...

Included Headers

#include "nodes/io/Input.h" #include "pipeline/PowerTelemetry.h" #include "pipeline/RuntimeMetrics.h" #include "pipeline/SessionOptions.h" #include <cstdint> #include <functional> #include <memory> #include <optional> #include <string> #include <vector>

Namespaces Index

namespacecv
namespacesimaai
namespaceneat
namespacepipeline_internal

Classes Index

structRunAdvancedOptions

Advanced runtime tuning knobs (most users never set these). More...

structRunOptions

Per-Run runtime options. More...

structInputDropInfo

Diagnostic record for a dropped input frame. More...

structInputStreamStats

Per-Run input-side telemetry: counts, drops, and timing averages. More...

structRunStats

Per-Run end-to-end statistics: counts and latency. More...

structRunMeasurementSummary

One-call runtime measurement summary. More...

structRunStageStats

Per-stage timing telemetry — how long each stage takes per sample. More...

structRunElementTimingStats

Per-element timing — finer-grained than per-stage; one row per GStreamer element. More...

structRunElementFlowStats

Per-element data-flow telemetry — buffer and byte counts, plus caps changes. More...

structRunElementPadTimingStats

Per-pad timing — finest-grained telemetry, one row per (element, pad). More...

structRunDiagSnapshot

Aggregate diagnostic snapshot: stages, boundaries, per-element, per-pad. More...

structMeasureOptions

Options for framework-owned runtime measurement. More...

structMeasureLatencyStats

Percentile summary for a measured latency series. More...

structMeasurePluginLatency

Aggregated per-plugin/kernel timing captured during a measurement window. More...

structMeasureReport

Framework-owned report returned by MeasureScope::stop(). More...

classMeasureScope

Observation scope for measuring an application-owned push/pull interval. More...

structRunReportOptions

Toggles for what Run::report() includes in its formatted text output. More...

classRun

Live pipeline handle: push inputs in, pull outputs out. More...

Description

Run — the live pipeline returned by Session::build, plus runtime options and diagnostics.

Run is what a Session becomes when built. It owns the running GStreamer pipeline, its internal threads (typically 5–15: one per element thread boundary, plus dispatcher workers, plus a bus watcher), and its bounded queues. Application code interacts with a Run by push()-ing inputs and pull()-ing outputs — or run() for a synchronous shortcut. A Run can operate in Async mode (continuous pipeline, push/pull at user pace) or Sync mode (one frame in, one result out, repeat).

This header also defines:

  • OverflowPolicy — how push() behaves when the input queue is full.
  • RunPreset — preset bundles for common workloads (realtime, balanced, reliable).
  • RunOptions — runtime knobs (queue depth, overflow policy, output memory, metrics).
  • RunStats / InputStreamStats / RunDiagSnapshot — telemetry surfaces.
See Also

Session::build for how a Run is constructed

See Also

SessionOptions for build-time options (Run takes runtime options here)

See Also

"Runs: the live pipeline (and the timing decision)" (§0.13 of the design deep dive)

File Listing

The file content with the documentation metadata removed is:

1
23#pragma once
24
25#include "nodes/io/Input.h"
29
30#include <cstdint>
31#include <functional>
32#include <memory>
33#include <optional>
34#include <string>
35#include <vector>
36
37namespace cv {
38class Mat;
39} // namespace cv
40
41namespace simaai::neat {
42
43class InputStream;
44class LatencyProfiler;
45struct InputStreamOptions;
46namespace pipeline_internal {
47struct InputRouteProcessor;
49using InputRouteProcessorPtr = std::shared_ptr<const InputRouteProcessor>;
50} // namespace pipeline_internal
51
59enum class OverflowPolicy {
60 Block = 0,
65};
66
74enum class RunPreset {
78};
79
88enum class OutputMemory {
89 Auto = 0,
91 Owned,
92};
93
96 bool copy_input =
97 false;
98 std::size_t max_input_bytes = 0;
100 -1;
114};
115
124struct RunOptions {
126 int queue_depth = 4;
131 bool enable_metrics = false;
141
148 RunOptions& enable_board_power(int sample_interval_ms = 100) {
149 power_monitor = board_power_monitor_options(sample_interval_ms);
150 return *this;
151 }
152
165 RunOptions& enable_modalix_som_power(int sample_interval_ms = 100) {
167 return *this;
168 }
169
173 RunOptions& enable_modalix_dvt_power(int sample_interval_ms = 100) {
175 return *this;
176 }
177
183 return *this;
184 }
185
192 std::function<void(const struct InputDropInfo&)> on_input_drop;
193};
194
204 std::string media_type;
205 std::string format;
206 int width = -1;
207 int height = -1;
208 int depth = -1;
209 int64_t frame_id = -1;
210 std::string stream_id;
211 std::string port_name;
212 std::string reason;
213};
214
223 std::uint64_t push_count = 0;
224 std::uint64_t push_failures = 0;
225 std::uint64_t pull_count = 0;
226 std::uint64_t poll_count = 0;
227 std::uint64_t dropped_frames = 0;
228 std::uint64_t renegotiations = 0;
229 std::uint64_t alloc_grows = 0;
230 std::uint64_t growth_blocked = 0;
231 std::uint64_t renegotiation_blocked =
232 0;
233 double avg_alloc_us = 0.0;
234 double avg_map_us = 0.0;
235 double avg_copy_us = 0.0;
236 double avg_push_us = 0.0;
237 double avg_pull_wait_us = 0.0;
238 double avg_decode_us = 0.0;
239};
240
248struct RunStats {
249 std::uint64_t inputs_enqueued = 0;
250 std::uint64_t inputs_dropped = 0;
251 std::uint64_t inputs_pushed = 0;
252 std::uint64_t outputs_ready =
253 0;
254 std::uint64_t outputs_pulled = 0;
255 std::uint64_t outputs_dropped =
256 0;
257 double avg_latency_ms = 0.0;
258 double min_latency_ms = 0.0;
259 double max_latency_ms = 0.0;
260};
261
274 double elapsed_seconds = 0.0;
275 double throughput_fps = 0.0;
277};
278
284 std::string stage_name;
285 std::uint64_t samples = 0;
286 std::uint64_t total_us = 0;
287 std::uint64_t max_us = 0;
288};
289
295 std::string element_name;
296 std::uint64_t samples = 0;
297 std::uint64_t total_us =
298 0;
299 std::uint64_t max_us = 0;
300 std::uint64_t min_us = 0;
301 std::uint64_t missed_in = 0;
302 std::uint64_t missed_out = 0;
303};
304
310 std::string element_name;
311 std::uint64_t in_buffers = 0;
312 std::uint64_t out_buffers = 0;
313 std::uint64_t in_bytes = 0;
314 std::uint64_t out_bytes = 0;
315 std::uint64_t caps_changes = 0;
316};
317
326 std::string element_name;
327 std::string pad_name;
328 bool is_sink = false;
329 std::uint64_t samples = 0;
330 std::uint64_t inter_arrival_total_us =
331 0;
332 std::uint64_t inter_arrival_max_us = 0;
333 std::uint64_t queue_wait_samples =
334 0;
335 std::uint64_t queue_wait_total_us =
336 0;
337 std::uint64_t queue_wait_max_us = 0;
338 std::uint64_t bytes = 0;
339};
340
349 std::vector<RunStageStats> stages;
350 std::vector<BoundaryFlowStats> boundaries;
351 std::vector<RunElementTimingStats> element_timings;
352 std::vector<RunElementFlowStats> element_flows;
359 std::vector<RunElementPadTimingStats> element_pad_timings;
360};
361
369 int duration_ms = 10000;
370 int warmup_ms = 1000;
371 int timeout_ms = 5000;
375 bool include_power = true;
376
379 std::string title = "NEAT measurement";
380 std::string model;
381 std::string input;
382 std::string placement;
384};
385
390 std::size_t count = 0;
391 double avg_ms = 0.0;
392 double p50_ms = 0.0;
393 double p90_ms = 0.0;
394 double p95_ms = 0.0;
395 double p99_ms = 0.0;
396 double max_ms = 0.0;
397};
398
403 std::string name;
404 std::uint64_t calls = 0;
405 double avg_ms = 0.0;
406 double min_ms = 0.0;
407 double max_ms = 0.0;
408};
409
415 std::size_t warmup_iterations = 0;
416 std::size_t outputs = 0;
417 double elapsed_s = 0.0;
420
424 std::vector<MeasurePluginLatency> plugin_latency;
425
426 std::uint64_t inputs_pushed = 0;
427 std::uint64_t outputs_pulled = 0;
428 std::uint64_t inputs_dropped = 0;
429 std::uint64_t outputs_dropped = 0;
432
434 std::string to_text() const;
435};
436
445public:
447 MeasureScope& operator=(MeasureScope&&) noexcept;
449
450 MeasureScope(const MeasureScope&) = delete;
451 MeasureScope& operator=(const MeasureScope&) = delete;
452
454 bool stopped() const;
455
456private:
457 friend class Run;
458 struct Impl;
459 explicit MeasureScope(std::unique_ptr<Impl> impl);
460 std::unique_ptr<Impl> impl_;
461};
462
471 bool include_pipeline = true;
472 bool include_stage_timings = true;
473 bool include_element_timings = true;
474 bool include_boundaries = true;
475 bool include_flow_stats = true;
476 bool include_node_reports = false;
477 bool include_next_cpu = false;
478 bool include_queue_depth = true;
479 bool include_num_buffers = true;
480 bool include_run_stats = true;
481 bool include_input_stats = true;
482 bool include_power = true;
483 bool include_system_info = false;
484};
485
512class Run {
513public:
515 Run() = default;
516 Run(const Run&) = delete;
517 Run& operator=(const Run&) = delete;
518
519 Run(Run&&) noexcept;
520 Run& operator=(Run&&) noexcept;
521 ~Run();
522
524 explicit operator bool() const noexcept;
526 bool can_push() const;
528 bool can_pull() const;
530 bool running() const;
531
536 bool push(const std::vector<cv::Mat>& inputs);
538 bool try_push(const std::vector<cv::Mat>& inputs);
540 bool push(const TensorList& inputs);
542 bool try_push(const TensorList& inputs);
544 bool push(const SampleList& msgs);
546 bool try_push(const SampleList& msgs);
548 bool push_holder(const std::shared_ptr<void>& holder);
550 bool try_push_holder(const std::shared_ptr<void>& holder);
553 void close_input();
561 PullStatus pull(int timeout_ms, Sample& out, PullError* err = nullptr);
563 std::optional<Sample> pull(int timeout_ms = -1);
565 TensorList pull_tensors(int timeout_ms = -1);
567 SampleList pull_samples(int timeout_ms = -1);
569 TensorList run(const std::vector<cv::Mat>& inputs, int timeout_ms = -1);
571 TensorList run(const TensorList& inputs, int timeout_ms = -1);
573 SampleList run(const SampleList& inputs, int timeout_ms = -1);
574
576 MeasureScope start_measurement(const MeasureOptions& opt = {});
583 int warmup(const std::vector<cv::Mat>& inputs, int warm = -1, int timeout_ms = -1);
584
586 RunStats stats() const;
598 std::string metrics_report(const RuntimeMetricsOptions& opt = {},
601 std::string metrics_report(RuntimeMetricsFormat format) const;
603 std::string report(const RunReportOptions& opt = {}) const;
605 std::string last_error() const;
607 std::string diagnostics_summary() const;
608
610 void stop();
612 void close();
613
614private:
615 friend class MeasureScope;
616 struct State;
617 std::shared_ptr<State> state_;
618 bool owns_ref_ = false;
619
620 explicit Run(std::shared_ptr<State> state);
621 void require_async_mode(const char* where) const;
622 void require_async_pull_mode(const char* where) const;
623 void enqueue_run_images(const std::vector<cv::Mat>& inputs);
624 void enqueue_run_tensors(const TensorList& inputs);
625 void enqueue_run_samples(const SampleList& inputs);
626 TensorList pull_tensors_strict(int timeout_ms);
627 SampleList pull_samples_strict(int timeout_ms);
628 bool push_impl(const cv::Mat& input, bool block);
629 bool push_impl(const simaai::neat::Tensor& input, bool block);
630 bool push_holder_impl(const std::shared_ptr<void>& holder, bool block);
631 bool push_message_impl(const Sample& msg, bool block);
632 bool push_sample_impl(const Sample& msg, bool block);
633 static Run create(InputStream stream, const RunOptions& opt,
634 const struct InputStreamOptions& stream_opt, RunMode mode = RunMode::Async,
635 const std::optional<InputOptions>& tensor_input_opt_for_cv = std::nullopt,
636 pipeline_internal::InputRouteProcessorPtr input_route_processor = nullptr);
637 friend class Session;
638};
639
640} // namespace simaai::neat

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.