AI Watch A1
Multi-person 3D skeleton detection using Intel RealSense and OpenPose with Kafka support.
Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
KafkaManager Class Reference

Kafka class is a class that is responsible to send all generated output data, via Apache Kafka technology to the next AI Watch module. More...

#include <KafkaManager.hpp>

Public Member Functions

 KafkaManager (const char *topic)
 Construct a new Kafka Manager object. More...
 
 ~KafkaManager (void)
 Destroy the KafkaManager object. More...
 
void sendData (const char *key, Json::Value root)
 Send data contained in root via Kafka with ID equal to key. More...
 

Private Member Functions

void setProducer (rd_kafka_t *producer)
 Set the Kafka producer's instance. More...
 
void setConfiguration (rd_kafka_conf_t *configuration)
 Set the Kafka's configuration. More...
 
rd_kafka_t * getProducer (void)
 Get the Kafka producer's instance. More...
 
rd_kafka_conf_t * getConfiguration (void)
 Get the Kafka's configuration. More...
 
const char * getTopic (void)
 Get the destination Kafka's topic. More...
 
void loadConfigurationGroup (rd_kafka_conf_t *conf, GKeyFile *key_file, const char *group)
 Loads the configuration from Kafka's configuration file. More...
 
int getArraySize (const char **array)
 Get the "const char *" array size. More...
 
void setupProducer (void)
 Launches configuration loading, once reads all the configuration parameters and later set the producer instance. More...
 

Static Private Member Functions

static void dr_msg_cb (rd_kafka_t *kafka_handle, const rd_kafka_message_t *rkmessage, void *opaque)
 Optional per-message delivery callback (triggered by poll() or flush()) when a message has been successfully delivered or permanently failed delivery (after retries). More...
 

Private Attributes

rd_kafka_t * producer
 Kafka producer's instance. More...
 
rd_kafka_conf_t * configuration
 Kafka's configuration. More...
 
const char * topic
 Destination Kafka's topic. More...
 

Detailed Description

Kafka class is a class that is responsible to send all generated output data, via Apache Kafka technology to the next AI Watch module.

Definition at line 26 of file KafkaManager.hpp.

Constructor & Destructor Documentation

◆ KafkaManager()

KafkaManager::KafkaManager ( const char *  topic)
inline

Construct a new Kafka Manager object.

Parameters
topicDestination Kafka's topic.

Definition at line 107 of file KafkaManager.hpp.

107 : topic(topic) {
109 }
const char * topic
Destination Kafka's topic.
void setupProducer(void)
Launches configuration loading, once reads all the configuration parameters and later set the produce...

References setupProducer().

◆ ~KafkaManager()

KafkaManager::~KafkaManager ( void  )

Destroy the KafkaManager object.

Definition at line 87 of file KafkaManager.cpp.

87 {
88 setConfiguration(NULL);
89 rd_kafka_destroy(getProducer());
90}
rd_kafka_t * getProducer(void)
Get the Kafka producer's instance.
void setConfiguration(rd_kafka_conf_t *configuration)
Set the Kafka's configuration.

References getProducer(), and setConfiguration().

Member Function Documentation

◆ dr_msg_cb()

void KafkaManager::dr_msg_cb ( rd_kafka_t *  kafka_handle,
const rd_kafka_message_t *  rkmessage,
void *  opaque 
)
staticprivate

Optional per-message delivery callback (triggered by poll() or flush()) when a message has been successfully delivered or permanently failed delivery (after retries).

Parameters
kafka_handle
rkmessage
opaque

Definition at line 56 of file KafkaManager.cpp.

56 {
57 if (rkmessage->err) g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
58}

Referenced by setupProducer().

◆ getArraySize()

int KafkaManager::getArraySize ( const char **  array)
private

Get the "const char *" array size.

Parameters
array
Returns
int
See also
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Definition at line 52 of file KafkaManager.cpp.

52 {
53 return (array == nullptr || * array == nullptr) ? 0 : (sizeof(array) / sizeof(array[0]));
54}

◆ getConfiguration()

rd_kafka_conf_t * KafkaManager::getConfiguration ( void  )
private

Get the Kafka's configuration.

Returns
rd_kafka_conf_t *

Definition at line 27 of file KafkaManager.cpp.

27 {
28 return this->configuration;
29}
rd_kafka_conf_t * configuration
Kafka's configuration.

References configuration.

Referenced by setupProducer().

◆ getProducer()

rd_kafka_t * KafkaManager::getProducer ( void  )
private

Get the Kafka producer's instance.

Returns
rd_kafka_t *

Definition at line 23 of file KafkaManager.cpp.

23 {
24 return this->producer;
25}
rd_kafka_t * producer
Kafka producer's instance.

References producer.

Referenced by sendData(), setupProducer(), and ~KafkaManager().

◆ getTopic()

const char * KafkaManager::getTopic ( void  )
private

Get the destination Kafka's topic.

Returns
const char *

Definition at line 31 of file KafkaManager.cpp.

31 {
32 return this->topic;
33}

References topic.

Referenced by sendData().

◆ loadConfigurationGroup()

void KafkaManager::loadConfigurationGroup ( rd_kafka_conf_t *  conf,
GKeyFile *  key_file,
const char *  group 
)
private

Loads the configuration from Kafka's configuration file.

Parameters
conf
key_file
group

Definition at line 35 of file KafkaManager.cpp.

35 {
36 char errstr[512];
37 g_autoptr(GError) error = NULL;
38
39 gchar ** ptr = g_key_file_get_keys(key_file, group, NULL, & error);
41
42 while (* ptr) {
43 const char * key = * ptr;
44 g_autofree gchar * value = g_key_file_get_string(key_file, group, key, & error);
46
47 if (rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) CV_Error(RDKAFKA_CONF_SET_ERROR, RDKAFKA_CONF_SET_SCOPE);
48 ptr++;
49 }
50}
static const char * RDKAFKA_GET_KEYS_FROM_JSON_SCOPE
Definition: constants.hpp:39
static const short int RDKAFKA_CONF_SET_ERROR
Definition: constants.hpp:44
static const char * RDKAFKA_GET_KEY_FROM_JSON_SCOPE
Definition: constants.hpp:42
static const short int RDKAFKA_GET_KEYS_FROM_JSON_ERROR
Definition: constants.hpp:38
static const short int RDKAFKA_GET_KEY_FROM_JSON_ERROR
Definition: constants.hpp:41
static const char * RDKAFKA_CONF_SET_SCOPE
Definition: constants.hpp:45

References RDKAFKA_CONF_SET_ERROR, RDKAFKA_CONF_SET_SCOPE, RDKAFKA_GET_KEY_FROM_JSON_ERROR, RDKAFKA_GET_KEY_FROM_JSON_SCOPE, RDKAFKA_GET_KEYS_FROM_JSON_ERROR, and RDKAFKA_GET_KEYS_FROM_JSON_SCOPE.

Referenced by setupProducer().

◆ sendData()

void KafkaManager::sendData ( const char *  key,
Json::Value  root 
)

Send data contained in root via Kafka with ID equal to key.

Parameters
key
root

Definition at line 92 of file KafkaManager.cpp.

92 {
93 // Produce data
94 std::string keyString = root.toStyledString();
95 const char * value = keyString.c_str();
96 size_t messageSize = (size_t) keyString.size();
97 rd_kafka_resp_err_t err = rd_kafka_producev(getProducer(),
98 RD_KAFKA_V_TOPIC(getTopic()),
99 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
100 RD_KAFKA_V_KEY((void *) key, strlen(key) * sizeof(char)),
101 RD_KAFKA_V_VALUE((void *) value, messageSize),
102 RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
103
104 if (err) g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
105 /* } else {
106 g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);
107 }
108 */
109
110 rd_kafka_poll(producer, 0);
111 // Block until the messages are all sent.
112 // g_message("Flushing final messages..");
113 rd_kafka_flush(producer, 10 * 1000);
114
115 if (rd_kafka_outq_len(producer) > 0) g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
116// g_message("%d events were produced to topic %s.", message_count, topic);
117}
const char * getTopic(void)
Get the destination Kafka's topic.

References getProducer(), getTopic(), producer, and topic.

Referenced by FacadeSingleton::sendData().

◆ setConfiguration()

void KafkaManager::setConfiguration ( rd_kafka_conf_t *  configuration)
private

Set the Kafka's configuration.

Parameters
configurationKafka's configuration.

Definition at line 19 of file KafkaManager.cpp.

19 {
21}

References configuration.

Referenced by setupProducer(), and ~KafkaManager().

◆ setProducer()

void KafkaManager::setProducer ( rd_kafka_t *  producer)
private

Set the Kafka producer's instance.

Parameters
producerKafka producer's instance.

Definition at line 15 of file KafkaManager.cpp.

15 {
16 this->producer = producer;
17}

References producer.

Referenced by setupProducer().

◆ setupProducer()

void KafkaManager::setupProducer ( void  )
private

Launches configuration loading, once reads all the configuration parameters and later set the producer instance.

Definition at line 60 of file KafkaManager.cpp.

60 {
61 char errorString[512];
62 const char * configurationFile = "../configuration_file.ini";
63
64 g_autoptr(GError) error = NULL;
65 g_autoptr(GKeyFile) key_file = g_key_file_new();
66 if (!g_key_file_load_from_file (key_file, configurationFile, G_KEY_FILE_NONE, & error)) {
67 g_error ("Error loading config file: %s", error->message);
68 }
69
70 // Load the relevant configuration sections.
71 setConfiguration(rd_kafka_conf_new());
72 loadConfigurationGroup(getConfiguration(), key_file, "default");
73
74 // Install a delivery-error callback.
75 rd_kafka_conf_set_dr_msg_cb(getConfiguration(), dr_msg_cb);
76
77 // Create the Producer instance.
78 setProducer(rd_kafka_new(RD_KAFKA_PRODUCER, getConfiguration(), errorString, sizeof(errorString)));
79 if (!getProducer()) g_error("Failed to create new producer: %s", errorString);
80
81 // Configuration object is now owned, and freed, by the rd_kafka_t instance.
82 setConfiguration(NULL);
83}
static void dr_msg_cb(rd_kafka_t *kafka_handle, const rd_kafka_message_t *rkmessage, void *opaque)
Optional per-message delivery callback (triggered by poll() or flush()) when a message has been succe...
void loadConfigurationGroup(rd_kafka_conf_t *conf, GKeyFile *key_file, const char *group)
Loads the configuration from Kafka's configuration file.
rd_kafka_conf_t * getConfiguration(void)
Get the Kafka's configuration.
void setProducer(rd_kafka_t *producer)
Set the Kafka producer's instance.

References dr_msg_cb(), getConfiguration(), getProducer(), loadConfigurationGroup(), setConfiguration(), and setProducer().

Referenced by KafkaManager().

Member Data Documentation

◆ configuration

rd_kafka_conf_t* KafkaManager::configuration
private

Kafka's configuration.

Definition at line 35 of file KafkaManager.hpp.

Referenced by getConfiguration(), and setConfiguration().

◆ producer

rd_kafka_t* KafkaManager::producer
private

Kafka producer's instance.

Definition at line 31 of file KafkaManager.hpp.

Referenced by getProducer(), sendData(), and setProducer().

◆ topic

const char* KafkaManager::topic
private

Destination Kafka's topic.

Definition at line 39 of file KafkaManager.hpp.

Referenced by getTopic(), and sendData().


The documentation for this class was generated from the following files: