Tune Throughput and Queue Depth
| Field | Value |
|---|---|
| Difficulty | Advanced |
| Estimated Read Time | 15-20 minutes |
| Labels | performance, tuning, async, queues |
Concept
Tune three knobs that control async pipeline behavior under load — queue depth, overflow policy, metrics — and read back what happened. Performance tuning only helps once the correctness baseline is stable; this chapter assumes it is.
The chapter exercises RunOptions at the level a production pipeline needs to control:
queue_depth: how many in-flight samples the runtime accepts.overflow_policy:Block,KeepLatest, orDropIncomingwhen the queue is full.enable_metrics: turn on per-run metric collection.
APIs introduced
pyneat.RunOptions()with.queue_depth,.overflow_policy,.output_memory,.enable_metrics.pyneat.OverflowPolicy.{Block,KeepLatest,DropIncoming}— the policy values.run.try_push(sample)— non-blocking push that returns whether the sample was accepted.run.stats()— latency/enqueue/pull counters.run.input_stats()— push-side counters (accepted, dropped, queue fullness).
When to use this
- Throughput bottlenecks: increase queue depth and inspect drop/latency behavior.
- Low-latency preference: favor
KeepLatestin bursty streams. - Backpressure-sensitive ingestion: prefer
Blockfor strict loss control.
Prerequisites Chapter 002 (async basics). Chapter 011 (diagnostics).
References
Learning Process
- Build an async run path with explicit queue and overflow settings.
- Push a deterministic workload and drain outputs to completion.
- Inspect metrics and input stats for latency/drop behavior.
Run
Python:
python3 share/sima-neat/tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.py \
--iters 32 --queue 4 --drop block
C++ (prebuilt):
./lib/sima-neat/tutorials/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
C++ (build from source):
./build.sh --target tutorial_015_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.
Code
tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.cpp
// Tune async Session throughput via RunOptions: queue_depth, overflow_policy, metrics.
//
// Usage:
// tutorial_015_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]
#include "neat.h"
#include <opencv2/core.hpp>
#include <iostream>
#include <stdexcept>
#include <string>
namespace {
bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}
int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}
simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}
} // namespace
int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);
cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();
simaai::neat::Session session;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
session.add(simaai::neat::nodes::Input(in));
session.add(simaai::neat::nodes::Output());
// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
opt.enable_metrics = true;
auto run = session.build(std::vector<cv::Mat>{rgb}, simaai::neat::RunMode::Async, opt);
// try_push never blocks; pair it with close_input + drain pull loop.
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();
int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto stats = run.stats();
const auto input_stats = run.input_stats();
std::cout << "inputs_enqueued=" << stats.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << stats.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << stats.avg_latency_ms << "\n";
std::cout << "avg_push_us=" << input_stats.avg_push_us << "\n";
std::cout << "renegotiations=" << input_stats.renegotiations << "\n";
std::cout << "[OK] 015_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}