Skip to main content

Tune Throughput and Queue Depth

FieldValue
DifficultyAdvanced
Estimated Read Time15-20 minutes
Labelsperformance, tuning, async, queues

Concept

Tune three knobs that control async pipeline behavior under load — queue depth, overflow policy, metrics — and read back what happened. Performance tuning only helps once the correctness baseline is stable; this chapter assumes it is.

The chapter exercises RunOptions at the level a production pipeline needs to control:

  • queue_depth: how many in-flight samples the runtime accepts.
  • overflow_policy: Block, KeepLatest, or DropIncoming when the queue is full.
  • enable_metrics: turn on per-run metric collection.

APIs introduced

  • pyneat.RunOptions() with .queue_depth, .overflow_policy, .output_memory, .enable_metrics.
  • pyneat.OverflowPolicy.{Block,KeepLatest,DropIncoming} — the policy values.
  • run.try_push(sample) — non-blocking push that returns whether the sample was accepted.
  • run.stats() — latency/enqueue/pull counters.
  • run.input_stats() — push-side counters (accepted, dropped, queue fullness).

When to use this

  • Throughput bottlenecks: increase queue depth and inspect drop/latency behavior.
  • Low-latency preference: favor KeepLatest in bursty streams.
  • Backpressure-sensitive ingestion: prefer Block for strict loss control.

Prerequisites Chapter 002 (async basics). Chapter 011 (diagnostics).

References

Learning Process

  1. Build an async run path with explicit queue and overflow settings.
  2. Push a deterministic workload and drain outputs to completion.
  3. Inspect metrics and input stats for latency/drop behavior.

Run

Python:

python3 share/sima-neat/tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.py \
--iters 32 --queue 4 --drop block

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

C++ (build from source):

./build.sh --target tutorial_015_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

Code

tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.cpp
// Tune async Session throughput via RunOptions: queue_depth, overflow_policy, metrics.
//
// Usage:
// tutorial_015_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]

#include "neat.h"

#include <opencv2/core.hpp>

#include <iostream>
#include <stdexcept>
#include <string>

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}

} // namespace

int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);

cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();

simaai::neat::Session session;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
session.add(simaai::neat::nodes::Input(in));
session.add(simaai::neat::nodes::Output());

// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
opt.enable_metrics = true;

auto run = session.build(std::vector<cv::Mat>{rgb}, simaai::neat::RunMode::Async, opt);

// try_push never blocks; pair it with close_input + drain pull loop.
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();

int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;

const auto stats = run.stats();
const auto input_stats = run.input_stats();

std::cout << "inputs_enqueued=" << stats.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << stats.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << stats.avg_latency_ms << "\n";
std::cout << "avg_push_us=" << input_stats.avg_push_us << "\n";
std::cout << "renegotiations=" << input_stats.renegotiations << "\n";
std::cout << "[OK] 015_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source