37 g_autoptr(GError) error = NULL;
39 gchar ** ptr = g_key_file_get_keys(key_file, group, NULL, & error);
43 const char * key = * ptr;
44 g_autofree gchar * value = g_key_file_get_string(key_file, group, key, & error);
53 return (array ==
nullptr || * array ==
nullptr) ? 0 : (
sizeof(array) /
sizeof(array[0]));
57 if (rkmessage->err) g_error(
"Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
61 char errorString[512];
62 const char * configurationFile =
"../configuration_file.ini";
64 g_autoptr(GError) error = NULL;
65 g_autoptr(GKeyFile) key_file = g_key_file_new();
66 if (!g_key_file_load_from_file (key_file, configurationFile, G_KEY_FILE_NONE, & error)) {
67 g_error (
"Error loading config file: %s", error->message);
79 if (!
getProducer()) g_error(
"Failed to create new producer: %s", errorString);
94 std::string keyString = root.toStyledString();
95 const char * value = keyString.c_str();
96 size_t messageSize = (size_t) keyString.size();
97 rd_kafka_resp_err_t err = rd_kafka_producev(
getProducer(),
99 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
100 RD_KAFKA_V_KEY((
void *) key, strlen(key) *
sizeof(
char)),
101 RD_KAFKA_V_VALUE((
void *) value, messageSize),
102 RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
104 if (err) g_error(
"Failed to produce to topic %s: %s",
topic, rd_kafka_err2str(err));
113 rd_kafka_flush(
producer, 10 * 1000);
115 if (rd_kafka_outq_len(
producer) > 0) g_error(
"%d message(s) were not delivered", rd_kafka_outq_len(
producer));
const char * topic
Destination Kafka's topic.
const char * getTopic(void)
Get the destination Kafka's topic.
int getArraySize(const char **array)
Get the "const char *" array size.
void setupProducer(void)
Launches configuration loading, once reads all the configuration parameters and later set the produce...
void sendData(const char *key, Json::Value root)
Send data contained in root via Kafka with ID equal to key.
static void dr_msg_cb(rd_kafka_t *kafka_handle, const rd_kafka_message_t *rkmessage, void *opaque)
Optional per-message delivery callback (triggered by poll() or flush()) when a message has been succe...
rd_kafka_t * getProducer(void)
Get the Kafka producer's instance.
~KafkaManager(void)
Destroy the KafkaManager object.
rd_kafka_conf_t * configuration
Kafka's configuration.
void loadConfigurationGroup(rd_kafka_conf_t *conf, GKeyFile *key_file, const char *group)
Loads the configuration from Kafka's configuration file.
rd_kafka_t * producer
Kafka producer's instance.
rd_kafka_conf_t * getConfiguration(void)
Get the Kafka's configuration.
void setConfiguration(rd_kafka_conf_t *configuration)
Set the Kafka's configuration.
void setProducer(rd_kafka_t *producer)
Set the Kafka producer's instance.
static const char * RDKAFKA_GET_KEYS_FROM_JSON_SCOPE
static const short int RDKAFKA_CONF_SET_ERROR
static const char * RDKAFKA_GET_KEY_FROM_JSON_SCOPE
static const short int RDKAFKA_GET_KEYS_FROM_JSON_ERROR
static const short int RDKAFKA_GET_KEY_FROM_JSON_ERROR
static const char * RDKAFKA_CONF_SET_SCOPE