syslog-ng source
kafka-internal.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Balabit
3  * Copyright (c) 2020 Balazs Scheidler
4  * Copyright (c) 2020 Vivin Peris
5  * Copyright (c) 2025 Hofi <hofione@gmail.com>
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License version 2 as published
9  * by the Free Software Foundation, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  *
20  * As an additional exemption you are allowed to compile & link against the
21  * OpenSSL libraries as published by the OpenSSL project. See the file
22  * COPYING for details.
23  *
24  */
25 
26 #ifndef KAFKA_INTERNAL_H_INCLUDED
27 #define KAFKA_INTERNAL_H_INCLUDED
28 
31 #pragma GCC diagnostic push
32 #pragma GCC diagnostic ignored "-Wignored-qualifiers"
33 #include <librdkafka/rdkafka.h>
34 #pragma GCC diagnostic pop
35 #include "kafka-source-driver.h"
36 #include "kafka-source-worker.h"
37 #include "kafka-dest-driver.h"
38 #include "kafka-dest-worker.h"
39 #include "kafka-source-persist.h"
40 
41 #define KAFKA_DEEP_TRACE 0
42 #if SYSLOG_NG_ENABLE_DEBUG
43 # define kafka_msg_debug msg_verbose
44 # define kafka_msg_trace msg_verbose
45 # if KAFKA_DEEP_TRACE
46 # define kafka_msg_deep_trace msg_verbose
47 # else
48 # define kafka_msg_deep_trace(msg, ...)
49 # endif
50 #else
51 # define kafka_msg_debug msg_debug
52 # define kafka_msg_trace msg_trace
53 # define kafka_msg_deep_trace(msg, ...)
54 #endif
55 
56 #define MAX_KAFKA_TOPIC_NAME_LEN 249 // TODO: get from librdkafka?
57 #define MAX_KAFKA_PARTITION_KEY_NAME_LEN (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32) /* extra for partition number */
58 
59 #define TOPIC_NAME_ERROR topic_name_error_quark()
60 
61 typedef enum _KafkaLogging
62 {
66 
69 
70 typedef enum _KafkaConnectedState
71 {
74 
77 
78 typedef enum _KafkaTopicError
79 {
85 
86 GQuark topic_name_error_quark(void);
87 
88 gboolean kafka_validate_topic_pattern(const char *topic, GError **error);
89 gboolean kafka_validate_topic_name(const gchar *name, GError **error);
90 gboolean kafka_conf_get_prop(const rd_kafka_conf_t *conf, const gchar *name, gchar *dest, size_t *dest_size);
91 gboolean kafka_conf_set_prop(rd_kafka_conf_t *conf, const gchar *name, const gchar *value);
92 gboolean kafka_apply_config_props(rd_kafka_conf_t *conf, GList *props, gchar **protected_properties,
93  gsize protected_properties_num);
94 gboolean kafka_seek_partition(KafkaSourceDriver *self,
95  rd_kafka_topic_partition_t *partition,
96  int64_t offset,
97  int timeout_ms);
98 gboolean kafka_seek_partitions(KafkaSourceDriver *self,
99  rd_kafka_topic_partition_list_t *partitions,
100  int timeout_ms);
101 gchar *kafka_format_partition_key(const gchar *topic, int32_t partition, gchar *key, gsize key_size);
102 void kafka_log_partition_list(KafkaSourceDriver *self, const rd_kafka_topic_partition_list_t *partitions);
103 void kafka_log_callback(const rd_kafka_t *rkt, int level, const char *fac, const char *msg);
104 
105 void kafka_register_counters(KafkaSourceDriver *self, GHashTable *stats_table, const gchar *label,
106  const gchar *label_value, const gchar **counter_names, gint level);
107 void kafka_unregister_counters(KafkaSourceDriver *self, const gchar *label, const gchar *label_value,
108  StatsCounterItem *counter, const gchar **counter_names);
109 
110 rd_kafka_resp_err_t kafka_update_state(KafkaSourceDriver *self, gboolean lock);
111 void kafka_final_flush(KafkaSourceDriver *self);
112 
113 typedef struct _KafkaOptions
114 {
116  GList *config;
120 } KafkaOptions;
121 
124 void kafka_options_merge_config(KafkaOptions *self, GList *props);
125 gboolean kafka_options_set_logging(KafkaOptions *self, const gchar *logging);
126 void kafka_options_set_bootstrap_servers(KafkaOptions *self, const gchar *bootstrap_servers);
127 void kafka_options_set_poll_timeout(KafkaOptions *self, gint poll_timeout);
128 void kafka_options_set_state_update_timeout(KafkaOptions *self, gint state_update_timeout);
129 
130 typedef struct _KafkaState
131 {
132  GMutex mutex;
135 } KafkaState;
136 
137 typedef struct _KafkaOpaque
138 {
139  LogDriver *driver;
142 } KafkaOpaque;
143 
144 void kafka_opaque_init(KafkaOpaque *self, LogDriver *driver, KafkaOptions *options);
145 void kafka_opaque_deinit(KafkaOpaque *self);
146 LogDriver *kafka_opaque_driver(KafkaOpaque *self);
152 void kafka_opaque_state_set_last_error(KafkaOpaque *self, gint error);
153 
154 /* Kafka Source */
155 
156 typedef enum _KafkaSrcConsumerStrategy
157 {
160 
163 
164 typedef enum _KafkaSrcPersistStore
165 {
169 
171 {
173  /* WARNING: multiple inheritance! */
175 
178 
182 
186  guint fetch_delay;
188  guint fetch_limit; // TODO: use together with "queued.max.messages.kbytes", if 0 kafka's own setting is used automatically
191 };
192 
194 {
195  LogThreadedSourceWorker super;
196  gchar name[32]; /* see kafka_src_worker_new why a fixed size name buffer */
197 };
198 
199 const gchar *kafka_src_worker_get_name(LogThreadedSourceWorker *worker);
200 
202 {
203  LogThreadedSourceDriver super;
204  KafkaSourceOptions options;
206 
207  rd_kafka_t *kafka;
208  rd_kafka_queue_t *consumer_kafka_queue;
209  rd_kafka_queue_t *main_kafka_queue;
210  rd_kafka_topic_partition_list_t *assigned_partitions;
211 
212  gchar *group_id;
214 
216  GAsyncQueue **msg_queues;
217  GCond *queue_conds;
220  gchar single_queue_name[64];
221 
225 
228 
229  GHashTable *persists;
231  const gchar *persist_name;
232 
233  const gchar *stat_persist_name;
234  GHashTable *stats_topics;
235  GHashTable *stats_workers;
236  StatsAggregator *max_message_size;
237  StatsAggregator *average_messages_size;
238  StatsAggregator *CPS;
239 
240 };
241 
242 void kafka_sd_options_defaults(KafkaSourceOptions *self,
243  LogThreadedSourceWorkerOptions *worker_options);
244 void kafka_sd_options_destroy(KafkaSourceOptions *self);
245 
246 gboolean kafka_sd_reopen(LogDriver *s);
247 
248 gboolean kafka_sd_using_queues(KafkaSourceDriver *self);
249 guint kafka_sd_used_queue_num(KafkaSourceDriver *self);
250 guint kafka_sd_worker_queues_len(KafkaSourceDriver *self);
251 GAsyncQueue *kafka_sd_worker_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker);
252 void kafka_sd_wait_for_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker);
253 void kafka_sd_signal_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker);
254 void kafka_sd_signal_queue_ndx(KafkaSourceDriver *self, guint ndx);
255 void kafka_sd_signal_queues(KafkaSourceDriver *self);
256 gboolean kafka_sd_wait_for_queue_processors_to_sleep(KafkaSourceDriver *self, const gdouble iteration_sleep_time,
257  gboolean poll_kafka);
258 void kafka_sd_wait_for_queue_processors_to_exit(KafkaSourceDriver *self, const gdouble iteration_sleep_time);
259 void kafka_sd_drop_queued_messages(KafkaSourceDriver *self);
260 void kafka_sd_wakeup_kafka_queues(KafkaSourceDriver *self);
261 void kafka_sd_signal_reassign(KafkaSourceDriver *self);
262 gboolean kafka_sd_reassign_signaled(KafkaSourceDriver *self);
263 void kafka_sd_signal_assignement_invalidated(KafkaSourceDriver *self);
264 gboolean kafka_sd_assignement_invalidated_signaled(KafkaSourceDriver *self);
265 void kafka_sd_persist_add_msg_bookmark(KafkaSourceDriver *self,
266  AckTracker *ack_tracker,
267  const gchar *msg_topic_name,
268  int32_t msg_partition,
269  int64_t msg_offset);
270 gboolean kafka_sd_store_persist_offset(KafkaSourceDriver *self,
271  KafkaSourcePersist *persist,
272  int64_t offset);
273 gboolean kafka_sd_parallel_processing(KafkaSourceDriver *self);
274 gboolean kafka_sd_persist_all_ready(KafkaSourceDriver *self);
275 gboolean kafka_sd_persist_is_ready(KafkaSourceDriver *self,
276  const gchar *msg_topic_name,
277  int32_t msg_partition);
278 
279 void kafka_sd_update_msg_length_stats(KafkaSourceDriver *self, gsize len);
280 void kafka_sd_inc_msg_topic_stats(KafkaSourceDriver *self, const gchar *topic);
281 void kafka_sd_update_msg_worker_stats(KafkaSourceDriver *self, gint worker_ndx);
282 
283 /* Kafka Destination */
284 
286 {
287  LogThreadedDestWorker super;
288  struct iv_timer poll_timer;
289  GString *key;
290  GString *message;
292 };
293 
295 {
297 
298  LogTemplate *topic_name;
300 
301  LogTemplateOptions template_options;
302  LogTemplate *key;
303  LogTemplate *message;
304 
307 
309 };
310 
312 {
313  LogThreadedDestDriver super;
314  KafkaDestinationOptions options;
316 
317  rd_kafka_topic_t *topic;
318  rd_kafka_t *kafka;
319 
320  GHashTable *topics;
321  GMutex topics_lock;
322 
324 };
325 
326 const gchar *kafka_dest_worker_resolve_template_topic_name(KafkaDestWorker *self, LogMessage *msg);
327 rd_kafka_topic_t *kafka_dest_worker_calculate_topic_from_template(KafkaDestWorker *self, LogMessage *msg);
328 rd_kafka_topic_t *kafka_dest_worker_get_literal_topic(KafkaDestWorker *self);
329 rd_kafka_topic_t *kafka_dest_worker_calculate_topic(KafkaDestWorker *self, LogMessage *msg);
330 rd_kafka_topic_t *kafka_dd_query_insert_topic(KafkaDestDriver *self, const gchar *name);
331 gboolean kafka_dd_init(LogPipe *s);
332 
333 #endif
334 
const gchar * name
Definition: debugger.c:265
KafkaLogging
Definition: kafka-internal.h:62
@ KFL_TRACE_LEVEL
Definition: kafka-internal.h:65
@ KFL_DISABLED
Definition: kafka-internal.h:63
@ KFL_UNKNOWN
Definition: kafka-internal.h:67
@ KFL_KAFKA_LEVEL
Definition: kafka-internal.h:64
LogDriver * kafka_opaque_driver(KafkaOpaque *self)
Definition: kafka-internal.c:431
void kafka_opaque_state_set_last_error(KafkaOpaque *self, gint error)
Definition: kafka-internal.c:469
GQuark topic_name_error_quark(void)
Definition: kafka-internal.c:30
gboolean kafka_seek_partition(KafkaSourceDriver *self, rd_kafka_topic_partition_t *partition, int64_t offset, int timeout_ms)
Definition: kafka-internal.c:199
rd_kafka_topic_t * kafka_dd_query_insert_topic(KafkaDestDriver *self, const gchar *name)
Definition: kafka-dest-driver.c:194
void kafka_sd_persist_add_msg_bookmark(KafkaSourceDriver *self, AckTracker *ack_tracker, const gchar *msg_topic_name, int32_t msg_partition, int64_t msg_offset)
Definition: kafka-source-driver.c:833
void kafka_register_counters(KafkaSourceDriver *self, GHashTable *stats_table, const gchar *label, const gchar *label_value, const gchar **counter_names, gint level)
Definition: kafka-internal.c:271
void kafka_sd_update_msg_worker_stats(KafkaSourceDriver *self, gint worker_ndx)
Definition: kafka-source-driver.c:233
gboolean kafka_options_set_logging(KafkaOptions *self, const gchar *logging)
Definition: kafka-internal.c:381
KafkaSrcConsumerStrategy
Definition: kafka-internal.h:157
@ KSCS_SUBSCRIBE
Definition: kafka-internal.h:159
@ KSCS_ASSIGN
Definition: kafka-internal.h:158
@ KSCS_UNDEFINED
Definition: kafka-internal.h:161
rd_kafka_resp_err_t kafka_update_state(KafkaSourceDriver *self, gboolean lock)
Definition: kafka-source-driver.c:453
gboolean kafka_sd_parallel_processing(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:906
void kafka_final_flush(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1277
void kafka_opaque_state_set(KafkaOpaque *self, KafkaConnectedState state)
Definition: kafka-internal.c:457
gboolean kafka_sd_persist_all_ready(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:852
gboolean kafka_sd_using_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:900
void kafka_options_defaults(KafkaOptions *self)
Definition: kafka-internal.c:343
void kafka_opaque_init(KafkaOpaque *self, LogDriver *driver, KafkaOptions *options)
Definition: kafka-internal.c:410
gboolean kafka_sd_reassign_signaled(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:928
gboolean kafka_apply_config_props(rd_kafka_conf_t *conf, GList *props, gchar **protected_properties, gsize protected_properties_num)
Definition: kafka-internal.c:176
void kafka_sd_drop_queued_messages(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1257
void kafka_options_destroy(KafkaOptions *self)
Definition: kafka-internal.c:351
void kafka_sd_update_msg_length_stats(KafkaSourceDriver *self, gsize len)
Definition: kafka-source-driver.c:210
void kafka_sd_options_destroy(KafkaSourceOptions *self)
Definition: kafka-source-driver.c:1965
const gchar * kafka_src_worker_get_name(LogThreadedSourceWorker *worker)
Definition: kafka-source-worker.c:505
void kafka_opaque_state_unlock(KafkaOpaque *self)
Definition: kafka-internal.c:443
gboolean kafka_validate_topic_pattern(const char *topic, GError **error)
Definition: kafka-internal.c:53
void kafka_options_set_bootstrap_servers(KafkaOptions *self, const gchar *bootstrap_servers)
Definition: kafka-internal.c:391
GAsyncQueue * kafka_sd_worker_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1156
void kafka_options_set_poll_timeout(KafkaOptions *self, gint poll_timeout)
Definition: kafka-internal.c:398
void kafka_unregister_counters(KafkaSourceDriver *self, const gchar *label, const gchar *label_value, StatsCounterItem *counter, const gchar **counter_names)
Definition: kafka-internal.c:309
KafkaConnectedState kafka_opaque_state_get(KafkaOpaque *self)
Definition: kafka-internal.c:451
guint kafka_sd_used_queue_num(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:894
rd_kafka_topic_t * kafka_dest_worker_calculate_topic_from_template(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:74
gboolean kafka_sd_assignement_invalidated_signaled(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:938
gboolean kafka_sd_store_persist_offset(KafkaSourceDriver *self, KafkaSourcePersist *persist, int64_t offset)
Definition: kafka-source-driver.c:775
KafkaSrcPersistStore
Definition: kafka-internal.h:165
@ KSPS_REMOTE
Definition: kafka-internal.h:167
@ KSPS_LOCAL
Definition: kafka-internal.h:166
void kafka_sd_signal_reassign(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:912
void kafka_options_merge_config(KafkaOptions *self, GList *props)
Definition: kafka-internal.c:362
void kafka_sd_wait_for_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1164
KafkaTopicError
Definition: kafka-internal.h:79
@ TOPIC_LENGTH_ZERO
Definition: kafka-internal.h:80
@ TOPIC_EXCEEDS_MAX_LENGTH
Definition: kafka-internal.h:82
@ TOPIC_DOT_TWO_DOTS
Definition: kafka-internal.h:81
@ TOPIC_INVALID_PATTERN
Definition: kafka-internal.h:83
rd_kafka_topic_t * kafka_dest_worker_get_literal_topic(KafkaDestWorker *self)
Definition: kafka-dest-worker.c:85
void kafka_sd_options_defaults(KafkaSourceOptions *self, LogThreadedSourceWorkerOptions *worker_options)
Definition: kafka-source-driver.c:1939
gboolean kafka_sd_persist_is_ready(KafkaSourceDriver *self, const gchar *msg_topic_name, int32_t msg_partition)
Definition: kafka-source-driver.c:877
void kafka_sd_signal_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1238
void kafka_sd_signal_queue_ndx(KafkaSourceDriver *self, guint ndx)
Definition: kafka-source-driver.c:1224
const gchar * kafka_dest_worker_resolve_template_topic_name(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:50
KafkaConnectedState
Definition: kafka-internal.h:71
@ KFS_UNKNOWN
Definition: kafka-internal.h:75
@ KFS_CONNECTED
Definition: kafka-internal.h:72
@ KFS_DISCONNECTED
Definition: kafka-internal.h:73
gint kafka_opaque_state_get_last_error(KafkaOpaque *self)
Definition: kafka-internal.c:463
gboolean kafka_sd_reopen(LogDriver *s)
Definition: kafka-source-driver.c:1645
rd_kafka_topic_t * kafka_dest_worker_calculate_topic(KafkaDestWorker *self, LogMessage *msg)
Definition: kafka-dest-worker.c:93
void kafka_sd_wait_for_queue_processors_to_exit(KafkaSourceDriver *self, const gdouble iteration_sleep_time)
Definition: kafka-source-driver.c:1184
void kafka_opaque_state_lock(KafkaOpaque *self)
Definition: kafka-internal.c:437
void kafka_log_partition_list(KafkaSourceDriver *self, const rd_kafka_topic_partition_list_t *partitions)
Definition: kafka-internal.c:258
void kafka_sd_signal_queue(KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
Definition: kafka-source-driver.c:1230
gboolean kafka_conf_set_prop(rd_kafka_conf_t *conf, const gchar *name, const gchar *value)
Definition: kafka-internal.c:142
guint kafka_sd_worker_queues_len(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1247
void kafka_options_set_state_update_timeout(KafkaOptions *self, gint state_update_timeout)
Definition: kafka-internal.c:404
void kafka_log_callback(const rd_kafka_t *rkt, int level, const char *fac, const char *msg)
Definition: kafka-internal.c:112
void kafka_sd_inc_msg_topic_stats(KafkaSourceDriver *self, const gchar *topic)
Definition: kafka-source-driver.c:217
void kafka_sd_wakeup_kafka_queues(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:1308
gboolean kafka_sd_wait_for_queue_processors_to_sleep(KafkaSourceDriver *self, const gdouble iteration_sleep_time, gboolean poll_kafka)
Definition: kafka-source-driver.c:1205
gboolean kafka_seek_partitions(KafkaSourceDriver *self, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Definition: kafka-internal.c:230
gboolean kafka_conf_get_prop(const rd_kafka_conf_t *conf, const gchar *name, gchar *dest, size_t *dest_size)
Definition: kafka-internal.c:127
gboolean kafka_validate_topic_name(const gchar *name, GError **error)
Definition: kafka-internal.c:72
void kafka_opaque_deinit(KafkaOpaque *self)
Definition: kafka-internal.c:421
gchar * kafka_format_partition_key(const gchar *topic, int32_t partition, gchar *key, gsize key_size)
Definition: kafka-internal.c:192
gboolean kafka_dd_init(LogPipe *s)
Definition: kafka-dest-driver.c:463
void kafka_sd_signal_assignement_invalidated(KafkaSourceDriver *self)
Definition: kafka-source-driver.c:920
Definition: atomic.h:31
Definition: kafka-internal.h:138
KafkaState state
Definition: kafka-internal.h:141
LogDriver * driver
Definition: kafka-internal.h:139
KafkaOptions * options
Definition: kafka-internal.h:140
Definition: kafka-internal.h:114
KafkaLogging kafka_logging
Definition: kafka-internal.h:117
gint state_update_timeout
Definition: kafka-internal.h:119
gchar * bootstrap_servers
Definition: kafka-internal.h:115
gint poll_timeout
Definition: kafka-internal.h:118
GList * config
Definition: kafka-internal.h:116
Definition: kafka-internal.h:131
gint last_error
Definition: kafka-internal.h:134
GMutex mutex
Definition: kafka-internal.h:132
KafkaConnectedState state
Definition: kafka-internal.h:133
Definition: logthrsourcedrv.h:42
Definition: msg-format.h:69
Definition: stats-counter.h:67
Definition: kafka-internal.h:312
rd_kafka_t * kafka
Definition: kafka-internal.h:318
gboolean transaction_inited
Definition: kafka-internal.h:323
rd_kafka_topic_t * topic
Definition: kafka-internal.h:317
LogThreadedDestDriver super
Definition: kafka-internal.h:313
KafkaOpaque opaque
Definition: kafka-internal.h:315
GMutex topics_lock
Definition: kafka-internal.h:321
KafkaDestinationOptions options
Definition: kafka-internal.h:314
GHashTable * topics
Definition: kafka-internal.h:320
Definition: kafka-internal.h:286
LogThreadedDestWorker super
Definition: kafka-internal.h:287
struct iv_timer poll_timer
Definition: kafka-internal.h:288
GString * topic_name_buffer
Definition: kafka-internal.h:291
GString * message
Definition: kafka-internal.h:290
GString * key
Definition: kafka-internal.h:289
Definition: kafka-internal.h:295
gchar * fallback_topic_name
Definition: kafka-internal.h:299
LogTemplate * topic_name
Definition: kafka-internal.h:298
LogTemplate * message
Definition: kafka-internal.h:303
gint flush_timeout_on_reload
Definition: kafka-internal.h:306
LogTemplate * key
Definition: kafka-internal.h:302
LogTemplateOptions template_options
Definition: kafka-internal.h:301
gboolean transaction_commit
Definition: kafka-internal.h:308
KafkaOptions super
Definition: kafka-internal.h:296
gint flush_timeout_on_shutdown
Definition: kafka-internal.h:305
Definition: kafka-internal.h:202
rd_kafka_queue_t * consumer_kafka_queue
Definition: kafka-internal.h:208
GAsyncQueue ** msg_queues
Definition: kafka-internal.h:216
KafkaSrcConsumerStrategy strategy
Definition: kafka-internal.h:215
gboolean assignement_invalidated_signaled
Definition: kafka-internal.h:224
GAtomicCounter running_thread_num
Definition: kafka-internal.h:226
StatsAggregator * CPS
Definition: kafka-internal.h:238
rd_kafka_topic_partition_list_t * assigned_partitions
Definition: kafka-internal.h:210
StatsAggregator * average_messages_size
Definition: kafka-internal.h:237
KafkaSourceOptions options
Definition: kafka-internal.h:204
GList * requested_topics
Definition: kafka-internal.h:213
rd_kafka_queue_t * main_kafka_queue
Definition: kafka-internal.h:209
GHashTable * stats_workers
Definition: kafka-internal.h:235
rd_kafka_t * kafka
Definition: kafka-internal.h:207
GCond * queue_conds
Definition: kafka-internal.h:217
gchar single_queue_name[64]
Definition: kafka-internal.h:220
GMutex * queue_cond_mutexes
Definition: kafka-internal.h:218
GHashTable * stats_topics
Definition: kafka-internal.h:234
gboolean all_persists_ready
Definition: kafka-internal.h:230
guint allocated_queue_num
Definition: kafka-internal.h:219
gboolean reassign_signaled
Definition: kafka-internal.h:223
GAtomicCounter sleeping_thread_num
Definition: kafka-internal.h:227
const gchar * persist_name
Definition: kafka-internal.h:231
gchar * group_id
Definition: kafka-internal.h:212
GMutex partition_assignement_mutex
Definition: kafka-internal.h:222
StatsAggregator * max_message_size
Definition: kafka-internal.h:236
LogThreadedSourceDriver super
Definition: kafka-internal.h:203
const gchar * stat_persist_name
Definition: kafka-internal.h:233
GHashTable * persists
Definition: kafka-internal.h:229
KafkaOpaque opaque
Definition: kafka-internal.h:205
Definition: kafka-internal.h:171
guint fetch_delay
Definition: kafka-internal.h:186
KafkaOptions super
Definition: kafka-internal.h:172
guint fetch_queue_full_delay
Definition: kafka-internal.h:189
MsgFormatOptions * format_options
Definition: kafka-internal.h:176
gboolean disable_bookmarks
Definition: kafka-internal.h:184
KafkaSrcPersistStore persist_store
Definition: kafka-internal.h:185
gboolean ignore_saved_bookmarks
Definition: kafka-internal.h:183
KafkaSrcConsumerStrategy strategy_hint
Definition: kafka-internal.h:180
guint fetch_retry_delay
Definition: kafka-internal.h:187
gint time_reopen
Definition: kafka-internal.h:181
LogThreadedSourceWorkerOptions * worker_options
Definition: kafka-internal.h:174
GList * requested_topics
Definition: kafka-internal.h:179
gboolean separated_worker_queues
Definition: kafka-internal.h:190
guint fetch_limit
Definition: kafka-internal.h:188
gboolean store_kafka_metadata
Definition: kafka-internal.h:177
Definition: kafka-internal.h:194
LogThreadedSourceWorker super
Definition: kafka-internal.h:195
gchar name[32]
Definition: kafka-internal.h:196
GString * value
Definition: test_decode.c:28
HTTPDestinationDriver * driver
Definition: test_http-signal_slot.c:35
LogMessage * msg
Definition: test_rename.c:35
struct @95 state
struct tm key
Definition: cache.c:63