AI Watch A1
Multi-person 3D skeleton detection using Intel RealSense and OpenPose with Kafka support.
KafkaManager.cpp
Go to the documentation of this file.
1//
2// KafkaManager.cpp
3// AI Watch A1
4//
5// Created by Denny Caruso on 27/07/22.
6//
7
8// License: Apache 2.0. See LICENSE file in root directory.
9// Copyright(c) 2022. All Rights Reserved.
10
11#include "KafkaManager.hpp"
12
13
14
15void KafkaManager::setProducer (rd_kafka_t * producer) {
16 this->producer = producer;
17}
18
19void KafkaManager::setConfiguration (rd_kafka_conf_t * configuration) {
20 this->configuration = configuration;
21}
22
23rd_kafka_t * KafkaManager::getProducer (void) {
24 return this->producer;
25}
26
27rd_kafka_conf_t * KafkaManager::getConfiguration (void) {
28 return this->configuration;
29}
30
31const char * KafkaManager::getTopic (void) {
32 return this->topic;
33}
34
35void KafkaManager::loadConfigurationGroup(rd_kafka_conf_t * conf, GKeyFile * key_file, const char * group) {
36 char errstr[512];
37 g_autoptr(GError) error = NULL;
38
39 gchar ** ptr = g_key_file_get_keys(key_file, group, NULL, & error);
41
42 while (* ptr) {
43 const char * key = * ptr;
44 g_autofree gchar * value = g_key_file_get_string(key_file, group, key, & error);
46
47 if (rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) CV_Error(RDKAFKA_CONF_SET_ERROR, RDKAFKA_CONF_SET_SCOPE);
48 ptr++;
49 }
50}
51
52int KafkaManager::getArraySize (const char ** array) {
53 return (array == nullptr || * array == nullptr) ? 0 : (sizeof(array) / sizeof(array[0]));
54}
55
56void KafkaManager::dr_msg_cb (rd_kafka_t * kafka_handle, const rd_kafka_message_t * rkmessage, void * opaque) {
57 if (rkmessage->err) g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
58}
59
61 char errorString[512];
62 const char * configurationFile = "../configuration_file.ini";
63
64 g_autoptr(GError) error = NULL;
65 g_autoptr(GKeyFile) key_file = g_key_file_new();
66 if (!g_key_file_load_from_file (key_file, configurationFile, G_KEY_FILE_NONE, & error)) {
67 g_error ("Error loading config file: %s", error->message);
68 }
69
70 // Load the relevant configuration sections.
71 setConfiguration(rd_kafka_conf_new());
72 loadConfigurationGroup(getConfiguration(), key_file, "default");
73
74 // Install a delivery-error callback.
75 rd_kafka_conf_set_dr_msg_cb(getConfiguration(), dr_msg_cb);
76
77 // Create the Producer instance.
78 setProducer(rd_kafka_new(RD_KAFKA_PRODUCER, getConfiguration(), errorString, sizeof(errorString)));
79 if (!getProducer()) g_error("Failed to create new producer: %s", errorString);
80
81 // Configuration object is now owned, and freed, by the rd_kafka_t instance.
82 setConfiguration(NULL);
83}
84
85
86
88 setConfiguration(NULL);
89 rd_kafka_destroy(getProducer());
90}
91
92void KafkaManager::sendData (const char * key, Json::Value root) {
93 // Produce data
94 std::string keyString = root.toStyledString();
95 const char * value = keyString.c_str();
96 size_t messageSize = (size_t) keyString.size();
97 rd_kafka_resp_err_t err = rd_kafka_producev(getProducer(),
98 RD_KAFKA_V_TOPIC(getTopic()),
99 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
100 RD_KAFKA_V_KEY((void *) key, strlen(key) * sizeof(char)),
101 RD_KAFKA_V_VALUE((void *) value, messageSize),
102 RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
103
104 if (err) g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
105 /* } else {
106 g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);
107 }
108 */
109
110 rd_kafka_poll(producer, 0);
111 // Block until the messages are all sent.
112 // g_message("Flushing final messages..");
113 rd_kafka_flush(producer, 10 * 1000);
114
115 if (rd_kafka_outq_len(producer) > 0) g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
116// g_message("%d events were produced to topic %s.", message_count, topic);
117}
const char * topic
Destination Kafka's topic.
const char * getTopic(void)
Get the destination Kafka's topic.
int getArraySize(const char **array)
Get the "const char *" array size.
void setupProducer(void)
Launches configuration loading, once reads all the configuration parameters and later set the produce...
void sendData(const char *key, Json::Value root)
Send data contained in root via Kafka with ID equal to key.
static void dr_msg_cb(rd_kafka_t *kafka_handle, const rd_kafka_message_t *rkmessage, void *opaque)
Optional per-message delivery callback (triggered by poll() or flush()) when a message has been succe...
rd_kafka_t * getProducer(void)
Get the Kafka producer's instance.
~KafkaManager(void)
Destroy the KafkaManager object.
rd_kafka_conf_t * configuration
Kafka's configuration.
void loadConfigurationGroup(rd_kafka_conf_t *conf, GKeyFile *key_file, const char *group)
Loads the configuration from Kafka's configuration file.
rd_kafka_t * producer
Kafka producer's instance.
rd_kafka_conf_t * getConfiguration(void)
Get the Kafka's configuration.
void setConfiguration(rd_kafka_conf_t *configuration)
Set the Kafka's configuration.
void setProducer(rd_kafka_t *producer)
Set the Kafka producer's instance.
static const char * RDKAFKA_GET_KEYS_FROM_JSON_SCOPE
Definition: constants.hpp:39
static const short int RDKAFKA_CONF_SET_ERROR
Definition: constants.hpp:44
static const char * RDKAFKA_GET_KEY_FROM_JSON_SCOPE
Definition: constants.hpp:42
static const short int RDKAFKA_GET_KEYS_FROM_JSON_ERROR
Definition: constants.hpp:38
static const short int RDKAFKA_GET_KEY_FROM_JSON_ERROR
Definition: constants.hpp:41
static const char * RDKAFKA_CONF_SET_SCOPE
Definition: constants.hpp:45