26 #ifndef KAFKA_INTERNAL_H_INCLUDED
27 #define KAFKA_INTERNAL_H_INCLUDED
31 #pragma GCC diagnostic push
32 #pragma GCC diagnostic ignored "-Wignored-qualifiers"
33 #include <librdkafka/rdkafka.h>
34 #pragma GCC diagnostic pop
41 #define KAFKA_DEEP_TRACE 0
42 #if SYSLOG_NG_ENABLE_DEBUG
43 # define kafka_msg_debug msg_verbose
44 # define kafka_msg_trace msg_verbose
46 # define kafka_msg_deep_trace msg_verbose
48 # define kafka_msg_deep_trace(msg, ...)
51 # define kafka_msg_debug msg_debug
52 # define kafka_msg_trace msg_trace
53 # define kafka_msg_deep_trace(msg, ...)
56 #define MAX_KAFKA_TOPIC_NAME_LEN 249
57 #define MAX_KAFKA_PARTITION_KEY_NAME_LEN (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32)
59 #define TOPIC_NAME_ERROR topic_name_error_quark()
61 typedef enum _KafkaLogging
70 typedef enum _KafkaConnectedState
78 typedef enum _KafkaTopicError
93 gsize protected_properties_num);
95 rd_kafka_topic_partition_t *partition,
99 rd_kafka_topic_partition_list_t *partitions,
106 const gchar *label_value,
const gchar **counter_names, gint level);
113 typedef struct _KafkaOptions
130 typedef struct _KafkaState
137 typedef struct _KafkaOpaque
156 typedef enum _KafkaSrcConsumerStrategy
164 typedef enum _KafkaSrcPersistStore
257 gboolean poll_kafka);
266 AckTracker *ack_tracker,
267 const gchar *msg_topic_name,
268 int32_t msg_partition,
271 KafkaSourcePersist *persist,
276 const gchar *msg_topic_name,
277 int32_t msg_partition);
const gchar * name
Definition: debugger.c:265
KafkaLogging
Definition: kafka-internal.h:62
@ KFL_TRACE_LEVEL
Definition: kafka-internal.h:65
@ KFL_DISABLED
Definition: kafka-internal.h:63
@ KFL_UNKNOWN
Definition: kafka-internal.h:67
@ KFL_KAFKA_LEVEL
Definition: kafka-internal.h:64
LogDriver * kafka_opaque_driver(KafkaOpaque *self)
Definition: kafka-internal.c:431
void kafka_opaque_state_set_last_error(KafkaOpaque *self, gint error)
Definition: kafka-internal.c:469
GQuark topic_name_error_quark(void)
Definition: kafka-internal.c:30
gboolean kafka_seek_partition(KafkaSourceDriver *self, rd_kafka_topic_partition_t *partition, int64_t offset, int timeout_ms)
Definition: kafka-internal.c:199
rd_kafka_topic_t * kafka_dd_query_insert_topic(KafkaDestDriver *self, const gchar *name)
Definition: kafka-dest-driver.c:194
void kafka_sd_persist_add_msg_bookmark(KafkaSourceDriver *self, AckTracker *ack_tracker, const gchar *msg_topic_name, int32_t msg_partition, int64_t msg_offset)
Definition: kafka-source-driver.c:833
void kafka_register_counters(KafkaSourceDriver *self, GHashTable *stats_table, const gchar *label, const gchar *label_value, const gchar **counter_names, gint level)
Definition: kafka-internal.c:271
void kafka_sd_update_msg_worker_stats(KafkaSourceDriver *self, gint worker_ndx)
Definition: kafka-source-driver.c:233
gboolean kafka_options_set_logging(KafkaOptions *self, const gchar *logging)
Definition: kafka-internal.c:381
KafkaSrcConsumerStrategy
Definition: kafka-internal.h:157
@ KSCS_SUBSCRIBE
Definition: kafka-internal.h:159
@ KSCS_ASSIGN
Definition: kafka-internal.h:158
@ KSCS_UNDEFINED
Definition: kafka-internal.h:161
rd_kafka_resp_err_t kafka_update_state(KafkaSourceDriver *self, gboolean lock)
Definition: kafka-source-driver.c:453
gboolean kafka_sd_parallel_processing(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:906
void kafka_final_flush(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1277
void kafka_opaque_state_set(KafkaOpaque *self, KafkaConnectedState state)
Definition: kafka-internal.c:457
gboolean kafka_sd_persist_all_ready(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:852
gboolean kafka_sd_using_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:900
void kafka_options_defaults(KafkaOptions *self)
Definition: kafka-internal.c:343
void kafka_opaque_init(KafkaOpaque *self, LogDriver *driver, KafkaOptions *options)
Definition: kafka-internal.c:410
gboolean kafka_sd_reassign_signaled(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:928
gboolean kafka_apply_config_props(rd_kafka_conf_t *conf, GList *props, gchar **protected_properties, gsize protected_properties_num)
Definition: kafka-internal.c:176
void kafka_sd_drop_queued_messages(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1257
void kafka_options_destroy(KafkaOptions *self)
Definition: kafka-internal.c:351
void kafka_sd_update_msg_length_stats(KafkaSourceDriver *self, gsize len)
Definition: kafka-source-driver.c:210
void kafka_sd_options_destroy(KafkaSourceOptions *self)
Definition: kafka-source-driver.c:1965
const gchar * kafka_src_worker_get_name(LogThreadedSourceWorker *worker)
Definition: kafka-source-worker.c:505
void kafka_opaque_state_unlock(KafkaOpaque *self)
Definition: kafka-internal.c:443
gboolean kafka_validate_topic_pattern(const char *topic, GError **error)
Definition: kafka-internal.c:53
void kafka_options_set_bootstrap_servers(KafkaOptions *self, const gchar *bootstrap_servers)
Definition: kafka-internal.c:391
GAsyncQueue * kafka_sd_worker_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1156
void kafka_options_set_poll_timeout(KafkaOptions *self, gint poll_timeout)
Definition: kafka-internal.c:398
void kafka_unregister_counters(KafkaSourceDriver *self, const gchar *label, const gchar *label_value, StatsCounterItem *counter, const gchar **counter_names)
Definition: kafka-internal.c:309
KafkaConnectedState kafka_opaque_state_get(KafkaOpaque *self)
Definition: kafka-internal.c:451
guint kafka_sd_used_queue_num(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:894
rd_kafka_topic_t * kafka_dest_worker_calculate_topic_from_template(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:74
gboolean kafka_sd_assignement_invalidated_signaled(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:938
gboolean kafka_sd_store_persist_offset(KafkaSourceDriver *self, KafkaSourcePersist *persist, int64_t offset)
Definition: kafka-source-driver.c:775
KafkaSrcPersistStore
Definition: kafka-internal.h:165
@ KSPS_REMOTE
Definition: kafka-internal.h:167
@ KSPS_LOCAL
Definition: kafka-internal.h:166
void kafka_sd_signal_reassign(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:912
void kafka_options_merge_config(KafkaOptions *self, GList *props)
Definition: kafka-internal.c:362
void kafka_sd_wait_for_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1164
KafkaTopicError
Definition: kafka-internal.h:79
@ TOPIC_LENGTH_ZERO
Definition: kafka-internal.h:80
@ TOPIC_EXCEEDS_MAX_LENGTH
Definition: kafka-internal.h:82
@ TOPIC_DOT_TWO_DOTS
Definition: kafka-internal.h:81
@ TOPIC_INVALID_PATTERN
Definition: kafka-internal.h:83
rd_kafka_topic_t * kafka_dest_worker_get_literal_topic(KafkaDestWorker *self)
Definition: kafka-dest-worker.c:85
void kafka_sd_options_defaults(KafkaSourceOptions *self, LogThreadedSourceWorkerOptions *worker_options)
Definition: kafka-source-driver.c:1939
gboolean kafka_sd_persist_is_ready(KafkaSourceDriver *self, const gchar *msg_topic_name, int32_t msg_partition)
Definition: kafka-source-driver.c:877
void kafka_sd_signal_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1238
void kafka_sd_signal_queue_ndx(KafkaSourceDriver *self, guint ndx)
Definition: kafka-source-driver.c:1224
const gchar * kafka_dest_worker_resolve_template_topic_name(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:50
KafkaConnectedState
Definition: kafka-internal.h:71
@ KFS_UNKNOWN
Definition: kafka-internal.h:75
@ KFS_CONNECTED
Definition: kafka-internal.h:72
@ KFS_DISCONNECTED
Definition: kafka-internal.h:73
gint kafka_opaque_state_get_last_error(KafkaOpaque *self)
Definition: kafka-internal.c:463
gboolean kafka_sd_reopen(LogDriver *s)
Definition: kafka-source-driver.c:1645
rd_kafka_topic_t * kafka_dest_worker_calculate_topic(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:93
void kafka_sd_wait_for_queue_processors_to_exit(KafkaSourceDriver *self, const gdouble iteration_sleep_time)
Definition: kafka-source-driver.c:1184
void kafka_opaque_state_lock(KafkaOpaque *self)
Definition: kafka-internal.c:437
void kafka_log_partition_list(KafkaSourceDriver *self, const rd_kafka_topic_partition_list_t *partitions)
Definition: kafka-internal.c:258
void kafka_sd_signal_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1230
gboolean kafka_conf_set_prop(rd_kafka_conf_t *conf, const gchar *name, const gchar *value)
Definition: kafka-internal.c:142
guint kafka_sd_worker_queues_len(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1247
void kafka_options_set_state_update_timeout(KafkaOptions *self, gint state_update_timeout)
Definition: kafka-internal.c:404
void kafka_log_callback(const rd_kafka_t *rkt, int level, const char *fac, const char *msg)
Definition: kafka-internal.c:112
void kafka_sd_inc_msg_topic_stats(KafkaSourceDriver *self, const gchar *topic)
Definition: kafka-source-driver.c:217
void kafka_sd_wakeup_kafka_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1308
gboolean kafka_sd_wait_for_queue_processors_to_sleep(KafkaSourceDriver *self, const gdouble iteration_sleep_time, gboolean poll_kafka)
Definition: kafka-source-driver.c:1205
gboolean kafka_seek_partitions(KafkaSourceDriver *self, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Definition: kafka-internal.c:230
gboolean kafka_conf_get_prop(const rd_kafka_conf_t *conf, const gchar *name, gchar *dest, size_t *dest_size)
Definition: kafka-internal.c:127
gboolean kafka_validate_topic_name(const gchar *name, GError **error)
Definition: kafka-internal.c:72
void kafka_opaque_deinit(KafkaOpaque *self)
Definition: kafka-internal.c:421
gchar * kafka_format_partition_key(const gchar *topic, int32_t partition, gchar *key, gsize key_size)
Definition: kafka-internal.c:192
gboolean kafka_dd_init(LogPipe *s)
Definition: kafka-dest-driver.c:463
void kafka_sd_signal_assignement_invalidated(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:920
Definition: kafka-internal.h:138
KafkaState state
Definition: kafka-internal.h:141
LogDriver * driver
Definition: kafka-internal.h:139
KafkaOptions * options
Definition: kafka-internal.h:140
Definition: kafka-internal.h:114
KafkaLogging kafka_logging
Definition: kafka-internal.h:117
gint state_update_timeout
Definition: kafka-internal.h:119
gchar * bootstrap_servers
Definition: kafka-internal.h:115
gint poll_timeout
Definition: kafka-internal.h:118
GList * config
Definition: kafka-internal.h:116
Definition: kafka-internal.h:131
gint last_error
Definition: kafka-internal.h:134
GMutex mutex
Definition: kafka-internal.h:132
KafkaConnectedState state
Definition: kafka-internal.h:133
Definition: logthrsourcedrv.h:42
Definition: stats-counter.h:67
Definition: kafka-internal.h:312
rd_kafka_t * kafka
Definition: kafka-internal.h:318
gboolean transaction_inited
Definition: kafka-internal.h:323
rd_kafka_topic_t * topic
Definition: kafka-internal.h:317
LogThreadedDestDriver super
Definition: kafka-internal.h:313
KafkaOpaque opaque
Definition: kafka-internal.h:315
GMutex topics_lock
Definition: kafka-internal.h:321
KafkaDestinationOptions options
Definition: kafka-internal.h:314
GHashTable * topics
Definition: kafka-internal.h:320
Definition: kafka-internal.h:286
LogThreadedDestWorker super
Definition: kafka-internal.h:287
struct iv_timer poll_timer
Definition: kafka-internal.h:288
GString * topic_name_buffer
Definition: kafka-internal.h:291
GString * message
Definition: kafka-internal.h:290
GString * key
Definition: kafka-internal.h:289
Definition: kafka-internal.h:295
gchar * fallback_topic_name
Definition: kafka-internal.h:299
LogTemplate * topic_name
Definition: kafka-internal.h:298
LogTemplate * message
Definition: kafka-internal.h:303
gint flush_timeout_on_reload
Definition: kafka-internal.h:306
LogTemplate * key
Definition: kafka-internal.h:302
LogTemplateOptions template_options
Definition: kafka-internal.h:301
gboolean transaction_commit
Definition: kafka-internal.h:308
KafkaOptions super
Definition: kafka-internal.h:296
gint flush_timeout_on_shutdown
Definition: kafka-internal.h:305
Definition: kafka-internal.h:202
rd_kafka_queue_t * consumer_kafka_queue
Definition: kafka-internal.h:208
GAsyncQueue ** msg_queues
Definition: kafka-internal.h:216
KafkaSrcConsumerStrategy strategy
Definition: kafka-internal.h:215
gboolean assignement_invalidated_signaled
Definition: kafka-internal.h:224
GAtomicCounter running_thread_num
Definition: kafka-internal.h:226
StatsAggregator * CPS
Definition: kafka-internal.h:238
rd_kafka_topic_partition_list_t * assigned_partitions
Definition: kafka-internal.h:210
StatsAggregator * average_messages_size
Definition: kafka-internal.h:237
KafkaSourceOptions options
Definition: kafka-internal.h:204
GList * requested_topics
Definition: kafka-internal.h:213
rd_kafka_queue_t * main_kafka_queue
Definition: kafka-internal.h:209
GHashTable * stats_workers
Definition: kafka-internal.h:235
rd_kafka_t * kafka
Definition: kafka-internal.h:207
GCond * queue_conds
Definition: kafka-internal.h:217
gchar single_queue_name[64]
Definition: kafka-internal.h:220
GMutex * queue_cond_mutexes
Definition: kafka-internal.h:218
GHashTable * stats_topics
Definition: kafka-internal.h:234
gboolean all_persists_ready
Definition: kafka-internal.h:230
guint allocated_queue_num
Definition: kafka-internal.h:219
gboolean reassign_signaled
Definition: kafka-internal.h:223
GAtomicCounter sleeping_thread_num
Definition: kafka-internal.h:227
const gchar * persist_name
Definition: kafka-internal.h:231
gchar * group_id
Definition: kafka-internal.h:212
GMutex partition_assignement_mutex
Definition: kafka-internal.h:222
StatsAggregator * max_message_size
Definition: kafka-internal.h:236
LogThreadedSourceDriver super
Definition: kafka-internal.h:203
const gchar * stat_persist_name
Definition: kafka-internal.h:233
GHashTable * persists
Definition: kafka-internal.h:229
KafkaOpaque opaque
Definition: kafka-internal.h:205
Definition: kafka-internal.h:171
guint fetch_delay
Definition: kafka-internal.h:186
KafkaOptions super
Definition: kafka-internal.h:172
guint fetch_queue_full_delay
Definition: kafka-internal.h:189
MsgFormatOptions * format_options
Definition: kafka-internal.h:176
gboolean disable_bookmarks
Definition: kafka-internal.h:184
KafkaSrcPersistStore persist_store
Definition: kafka-internal.h:185
gboolean ignore_saved_bookmarks
Definition: kafka-internal.h:183
KafkaSrcConsumerStrategy strategy_hint
Definition: kafka-internal.h:180
guint fetch_retry_delay
Definition: kafka-internal.h:187
gint time_reopen
Definition: kafka-internal.h:181
LogThreadedSourceWorkerOptions * worker_options
Definition: kafka-internal.h:174
GList * requested_topics
Definition: kafka-internal.h:179
gboolean separated_worker_queues
Definition: kafka-internal.h:190
guint fetch_limit
Definition: kafka-internal.h:188
gboolean store_kafka_metadata
Definition: kafka-internal.h:177
Definition: kafka-internal.h:194
LogThreadedSourceWorker super
Definition: kafka-internal.h:195
gchar name[32]
Definition: kafka-internal.h:196
GString * value
Definition: test_decode.c:28
HTTPDestinationDriver * driver
Definition: test_http-signal_slot.c:35
LogMessage * msg
Definition: test_rename.c:35
struct tm key
Definition: cache.c:63