Skip to main content

JoinBundle.h File

Stage executor that joins multiple inputs into a Bundle sample. More...

Included Headers

#include "graph/StageExecutor.h" #include "graph/nodes/StageNode.h" #include <cstddef> #include <cstdint> #include <deque> #include <memory> #include <string> #include <unordered_map> #include <unordered_set> #include <utility> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespacenodes

Classes Index

structJoinBundleOptions

Configuration for a JoinBundle stage executor. More...

classJoinBundle

Stage executor that joins samples from multiple input ports into a single bundled output. More...

structPending

Per-key in-flight bundle being assembled — one entry per pending join key. More...

Description

Stage executor that joins multiple inputs into a Bundle sample.

File Listing

The file content with the documentation metadata removed is:

1
6#pragma once
7
10
11#include <cstddef>
12#include <cstdint>
13#include <deque>
14#include <memory>
15#include <string>
16#include <unordered_map>
17#include <unordered_set>
18#include <utility>
19#include <vector>
20
22
32enum class JoinKeyPolicy {
33 StreamFrame = 0,
35};
36
46 std::vector<std::string> inputs;
47 std::unordered_set<std::string> required;
49 bool emit_partial = false;
50 std::size_t max_pending_keys =
51 4096;
52 int timeout_ms = 0;
53};
54
68public:
71
73 void set_ports(const StagePorts& ports) override;
75 void on_input(StageMsg&& msg, std::vector<StageOutMsg>& out) override;
77 void on_tick(std::int64_t now_ns, std::vector<StageOutMsg>& out) override;
78
79private:
81 struct Pending {
82 std::unordered_map<PortId, Sample>
83 samples;
84 std::int64_t last_seen_ns = 0;
85 };
86
87 std::string make_key_(const Sample& sample) const;
88 void touch_key_(const std::string& key);
89 void evict_expired_(std::int64_t now_ns);
90 void evict_oldest_();
91 bool ready_(const Pending& pending) const;
92 void erase_key_(const std::string& key);
93
95 std::vector<std::string> input_names_;
96 std::unordered_map<PortId, std::string> port_names_;
97 std::unordered_map<std::string, PortId> name_to_port_;
98 std::unordered_set<PortId> required_ports_;
99 std::unordered_map<std::string, Pending> pending_;
100 std::deque<std::string> order_;
101 PortId out_port_ = kInvalidPort;
102};
103
113std::shared_ptr<simaai::neat::graph::Node> JoinBundleNode(std::vector<std::string> inputs,
114 std::string label = {},
115 std::string output = "bundle",
116 JoinBundleOptions opt = {});
117
118} // namespace simaai::neat::graph::nodes

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.