Run Inference Asynchronously
| Field | Value |
|---|---|
| Difficulty | Beginner |
| Estimated Read Time | 10-15 minutes |
| Labels | async, push-pull, throughput, runtime |
Concept
Feed a model from a producer thread while consuming predictions from another — decouple input and output for real throughput. Same ResNet path as chapter 001, now async.
In a synchronous loop, one thread blocks while waiting for each result. That underutilizes compute when input production and output consumption can overlap. Async execution improves throughput by decoupling:
push(...)feeds inputs as they become ready.pull(...)consumes outputs independently.
APIs introduced
pyneat.Session()+session.add(model.session())— compose a model into a runnable session.session.build(sample, pyneat.RunMode.Async)— produce an asyncRunhandle.run.push(frame),run.pull(timeout_ms),run.close_input()— the producer/consumer pair.
When to use this Camera streams, batch processing, or any pipeline where inputs arrive faster than a one-at-a-time synchronous loop can handle.
Prerequisites
Chapter 001. Familiarity with pyneat.Model and model.run() is assumed.
References
Learning Process
- Prepare runtime inputs: parse CLI args, load ResNet50 MPK, and construct local input samples.
- Build the async run path and split responsibilities between producer
push(...)and consumerpull(...). - Observe queue-driven behavior and verify throughput-oriented execution.
- Validate results with top-1 output, async stats, and stable tutorial signature.
Run
Python:
python3 share/sima-neat/tutorials/002_run_inference_async/run_inference_async.py \
--mpk /tmp/resnet_50_mpk.tar.gz --n 4
C++ (prebuilt):
./lib/sima-neat/tutorials/tutorial_002_run_inference_async \
--mpk /tmp/resnet_50_mpk.tar.gz --n 4
C++ (build from source):
./build.sh --target tutorial_002_run_inference_async
./build/tutorials-standalone/tutorial_002_run_inference_async \
--mpk /tmp/resnet_50_mpk.tar.gz --n 4
To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.
Code
// Async push/pull: producer thread pushes frames, main thread pulls outputs.
//
// Usage:
// tutorial_002_run_inference_async --mpk /path/to/resnet_50.tar.gz [--image /path/to.jpg] [--n 4]
#include "neat.h"
#include <opencv2/core.hpp>
#include <opencv2/imgcodecs.hpp>
#include <opencv2/imgproc.hpp>
#include <atomic>
#include <cstring>
#include <exception>
#include <filesystem>
#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
namespace fs = std::filesystem;
namespace {
bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}
int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}
cv::Mat load_rgb(const fs::path& image_path, int size) {
cv::Mat bgr = cv::imread(image_path.string(), cv::IMREAD_COLOR);
if (bgr.empty())
throw std::runtime_error("failed to read image: " + image_path.string());
if (bgr.cols != size || bgr.rows != size) {
cv::resize(bgr, bgr, cv::Size(size, size), 0, 0, cv::INTER_AREA);
}
cv::Mat rgb;
cv::cvtColor(bgr, rgb, cv::COLOR_BGR2RGB);
if (!rgb.isContinuous())
rgb = rgb.clone();
return rgb;
}
simaai::neat::Model::Options build_options(int size) {
simaai::neat::Model::Options opt;
opt.preprocess.color_convert.input_format = simaai::neat::PreprocessColorFormat::RGB;
opt.preprocess.input_max_width = size;
opt.preprocess.input_max_height = size;
opt.preprocess.input_max_depth = 3;
opt.preprocess.normalize.mean = {0.485f, 0.456f, 0.406f};
opt.preprocess.normalize.stddev = {0.229f, 0.224f, 0.225f};
return opt;
}
int top1_from_output(const simaai::neat::Sample& out) {
if (simaai::neat::tensors_from_sample(out, true).empty())
throw std::runtime_error("no tensor output");
const simaai::neat::Mapping m = simaai::neat::tensors_from_sample(out, true).front().map_read();
const size_t n = m.size_bytes / sizeof(float);
const float* p = reinterpret_cast<const float*>(m.data);
int best = 0;
for (size_t i = 1; i < n && i < 1000; ++i) {
if (p[i] > p[best])
best = static_cast<int>(i);
}
return best;
}
} // namespace
int main(int argc, char** argv) {
try {
std::string mpk, image;
if (!get_arg(argc, argv, "--mpk", mpk)) {
std::cerr
<< "Usage: tutorial_002_run_inference_async --mpk <path> [--image <path>] [--n <n>]\n";
return 1;
}
get_arg(argc, argv, "--image", image);
const int n = parse_int_arg(argc, argv, "--n", 4);
const int size = 224;
cv::Mat frame = image.empty() ? cv::Mat(size, size, CV_8UC3, cv::Scalar(99, 99, 99))
: load_rgb(image, size);
std::vector<cv::Mat> frames(n, frame);
// CORE LOGIC
// Build a Session around the model and run it async: one producer thread pushes,
// the main thread pulls outputs.
simaai::neat::Model model(mpk, build_options(size));
simaai::neat::Session session;
session.add(model.session());
auto run = session.build(std::vector<cv::Mat>{frames.front()}, simaai::neat::RunMode::Async);
std::atomic<int> pushed{0};
std::atomic<bool> producer_done{false};
std::thread producer([&]() {
for (const cv::Mat& f : frames) {
run.push(std::vector<cv::Mat>{f});
pushed.fetch_add(1, std::memory_order_relaxed);
}
run.close_input();
producer_done.store(true);
});
int pulled = 0;
while (pulled < n) {
auto out = run.pull(/*timeout_ms=*/2000);
if (!out.has_value()) {
if (producer_done.load())
break;
continue;
}
std::cout << "top1=" << top1_from_output(*out) << "\n";
++pulled;
}
producer.join();
std::cout << "pushed=" << pushed.load() << " pulled=" << pulled << "\n";
if (pulled != n)
throw std::runtime_error("pulled=" + std::to_string(pulled) +
" != pushed=" + std::to_string(pushed.load()));
std::cout << "[OK] 002_run_inference_async\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}