syslog-ng source
kafka-internal.h File Reference
#include "logthrsource/logthrfetcherdrv.h"
#include "logthrdest/logthrdestdrv.h"
#include <librdkafka/rdkafka.h>
#include "kafka-source-driver.h"
#include "kafka-source-worker.h"
#include "kafka-dest-driver.h"
#include "kafka-dest-worker.h"
#include "kafka-source-persist.h"
Include dependency graph for kafka-internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  KafkaOptions
 
struct  KafkaState
 
struct  KafkaOpaque
 
struct  KafkaSourceOptions
 
struct  KafkaSourceWorker
 
struct  KafkaSourceDriver
 
struct  KafkaDestWorker
 
struct  KafkaDestinationOptions
 
struct  KafkaDestDriver
 

Macros

#define KAFKA_DEEP_TRACE   0
 
#define kafka_msg_debug   msg_debug
 
#define kafka_msg_trace   msg_trace
 
#define kafka_msg_deep_trace(msg, ...)
 
#define MAX_KAFKA_TOPIC_NAME_LEN   249
 
#define MAX_KAFKA_PARTITION_KEY_NAME_LEN   (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32) /* extra for partition number */
 
#define TOPIC_NAME_ERROR   topic_name_error_quark()
 

Enumerations

enum  KafkaLogging { KFL_DISABLED , KFL_KAFKA_LEVEL , KFL_TRACE_LEVEL , KFL_UNKNOWN }
 
enum  KafkaConnectedState { KFS_CONNECTED , KFS_DISCONNECTED , KFS_UNKNOWN }
 
enum  KafkaTopicError { TOPIC_LENGTH_ZERO , TOPIC_DOT_TWO_DOTS , TOPIC_EXCEEDS_MAX_LENGTH , TOPIC_INVALID_PATTERN }
 
enum  KafkaSrcConsumerStrategy { KSCS_ASSIGN , KSCS_SUBSCRIBE , KSCS_UNDEFINED }
 
enum  KafkaSrcPersistStore { KSPS_LOCAL , KSPS_REMOTE }
 

Functions

GQuark topic_name_error_quark (void)
 
gboolean kafka_validate_topic_pattern (const char *topic, GError **error)
 
gboolean kafka_validate_topic_name (const gchar *name, GError **error)
 
gboolean kafka_conf_get_prop (const rd_kafka_conf_t *conf, const gchar *name, gchar *dest, size_t *dest_size)
 
gboolean kafka_conf_set_prop (rd_kafka_conf_t *conf, const gchar *name, const gchar *value)
 
gboolean kafka_apply_config_props (rd_kafka_conf_t *conf, GList *props, gchar **protected_properties, gsize protected_properties_num)
 
gboolean kafka_seek_partition (KafkaSourceDriver *self, rd_kafka_topic_partition_t *partition, int64_t offset, int timeout_ms)
 
gboolean kafka_seek_partitions (KafkaSourceDriver *self, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
 
gchar * kafka_format_partition_key (const gchar *topic, int32_t partition, gchar *key, gsize key_size)
 
void kafka_log_partition_list (KafkaSourceDriver *self, const rd_kafka_topic_partition_list_t *partitions)
 
void kafka_log_callback (const rd_kafka_t *rkt, int level, const char *fac, const char *msg)
 
void kafka_register_counters (KafkaSourceDriver *self, GHashTable *stats_table, const gchar *label, const gchar *label_value, const gchar **counter_names, gint level)
 
void kafka_unregister_counters (KafkaSourceDriver *self, const gchar *label, const gchar *label_value, StatsCounterItem *counter, const gchar **counter_names)
 
rd_kafka_resp_err_t kafka_update_state (KafkaSourceDriver *self, gboolean lock)
 
void kafka_final_flush (KafkaSourceDriver *self)
 
void kafka_options_defaults (KafkaOptions *self)
 
void kafka_options_destroy (KafkaOptions *self)
 
void kafka_options_merge_config (KafkaOptions *self, GList *props)
 
gboolean kafka_options_set_logging (KafkaOptions *self, const gchar *logging)
 
void kafka_options_set_bootstrap_servers (KafkaOptions *self, const gchar *bootstrap_servers)
 
void kafka_options_set_poll_timeout (KafkaOptions *self, gint poll_timeout)
 
void kafka_options_set_state_update_timeout (KafkaOptions *self, gint state_update_timeout)
 
void kafka_opaque_init (KafkaOpaque *self, LogDriver *driver, KafkaOptions *options)
 
void kafka_opaque_deinit (KafkaOpaque *self)
 
LogDriver * kafka_opaque_driver (KafkaOpaque *self)
 
void kafka_opaque_state_lock (KafkaOpaque *self)
 
void kafka_opaque_state_unlock (KafkaOpaque *self)
 
KafkaConnectedState kafka_opaque_state_get (KafkaOpaque *self)
 
void kafka_opaque_state_set (KafkaOpaque *self, KafkaConnectedState state)
 
gint kafka_opaque_state_get_last_error (KafkaOpaque *self)
 
void kafka_opaque_state_set_last_error (KafkaOpaque *self, gint error)
 
const gchar * kafka_src_worker_get_name (LogThreadedSourceWorker *worker)
 
void kafka_sd_options_defaults (KafkaSourceOptions *self, LogThreadedSourceWorkerOptions *worker_options)
 
void kafka_sd_options_destroy (KafkaSourceOptions *self)
 
gboolean kafka_sd_reopen (LogDriver *s)
 
gboolean kafka_sd_using_queues (KafkaSourceDriver *self)
 
guint kafka_sd_used_queue_num (KafkaSourceDriver *self)
 
guint kafka_sd_worker_queues_len (KafkaSourceDriver *self)
 
GAsyncQueue * kafka_sd_worker_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
 
void kafka_sd_wait_for_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
 
void kafka_sd_signal_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker)
 
void kafka_sd_signal_queue_ndx (KafkaSourceDriver *self, guint ndx)
 
void kafka_sd_signal_queues (KafkaSourceDriver *self)
 
gboolean kafka_sd_wait_for_queue_processors_to_sleep (KafkaSourceDriver *self, const gdouble iteration_sleep_time, gboolean poll_kafka)
 
void kafka_sd_wait_for_queue_processors_to_exit (KafkaSourceDriver *self, const gdouble iteration_sleep_time)
 
void kafka_sd_drop_queued_messages (KafkaSourceDriver *self)
 
void kafka_sd_wakeup_kafka_queues (KafkaSourceDriver *self)
 
void kafka_sd_signal_reassign (KafkaSourceDriver *self)
 
gboolean kafka_sd_reassign_signaled (KafkaSourceDriver *self)
 
void kafka_sd_signal_assignement_invalidated (KafkaSourceDriver *self)
 
gboolean kafka_sd_assignement_invalidated_signaled (KafkaSourceDriver *self)
 
void kafka_sd_persist_add_msg_bookmark (KafkaSourceDriver *self, AckTracker *ack_tracker, const gchar *msg_topic_name, int32_t msg_partition, int64_t msg_offset)
 
gboolean kafka_sd_store_persist_offset (KafkaSourceDriver *self, KafkaSourcePersist *persist, int64_t offset)
 
gboolean kafka_sd_parallel_processing (KafkaSourceDriver *self)
 
gboolean kafka_sd_persist_all_ready (KafkaSourceDriver *self)
 
gboolean kafka_sd_persist_is_ready (KafkaSourceDriver *self, const gchar *msg_topic_name, int32_t msg_partition)
 
void kafka_sd_update_msg_length_stats (KafkaSourceDriver *self, gsize len)
 
void kafka_sd_inc_msg_topic_stats (KafkaSourceDriver *self, const gchar *topic)
 
void kafka_sd_update_msg_worker_stats (KafkaSourceDriver *self, gint worker_ndx)
 
const gchar * kafka_dest_worker_resolve_template_topic_name (KafkaDestWorker *self, LogMessage *msg)
 
rd_kafka_topic_t * kafka_dest_worker_calculate_topic_from_template (KafkaDestWorker *self, LogMessage *msg)
 
rd_kafka_topic_t * kafka_dest_worker_get_literal_topic (KafkaDestWorker *self)
 
rd_kafka_topic_t * kafka_dest_worker_calculate_topic (KafkaDestWorker *self, LogMessage *msg)
 
rd_kafka_topic_t * kafka_dd_query_insert_topic (KafkaDestDriver *self, const gchar *name)
 
gboolean kafka_dd_init (LogPipe *s)
 

Macro Definition Documentation

◆ KAFKA_DEEP_TRACE

#define KAFKA_DEEP_TRACE   0

◆ kafka_msg_debug

#define kafka_msg_debug   msg_debug

◆ kafka_msg_deep_trace

#define kafka_msg_deep_trace (   msg,
  ... 
)

◆ kafka_msg_trace

#define kafka_msg_trace   msg_trace

◆ MAX_KAFKA_PARTITION_KEY_NAME_LEN

#define MAX_KAFKA_PARTITION_KEY_NAME_LEN   (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32) /* extra for partition number */

◆ MAX_KAFKA_TOPIC_NAME_LEN

#define MAX_KAFKA_TOPIC_NAME_LEN   249

◆ TOPIC_NAME_ERROR

#define TOPIC_NAME_ERROR   topic_name_error_quark()

Enumeration Type Documentation

◆ KafkaConnectedState

Enumerator
KFS_CONNECTED 
KFS_DISCONNECTED 
KFS_UNKNOWN 

◆ KafkaLogging

Enumerator
KFL_DISABLED 
KFL_KAFKA_LEVEL 
KFL_TRACE_LEVEL 
KFL_UNKNOWN 

◆ KafkaSrcConsumerStrategy

Enumerator
KSCS_ASSIGN 
KSCS_SUBSCRIBE 
KSCS_UNDEFINED 

◆ KafkaSrcPersistStore

Enumerator
KSPS_LOCAL 
KSPS_REMOTE 

◆ KafkaTopicError

Enumerator
TOPIC_LENGTH_ZERO 
TOPIC_DOT_TWO_DOTS 
TOPIC_EXCEEDS_MAX_LENGTH 
TOPIC_INVALID_PATTERN 

Function Documentation

◆ kafka_apply_config_props()

gboolean kafka_apply_config_props ( rd_kafka_conf_t *  conf,
GList *  props,
gchar **  protected_properties,
gsize  protected_properties_num 
)

◆ kafka_conf_get_prop()

gboolean kafka_conf_get_prop ( const rd_kafka_conf_t *  conf,
const gchar *  name,
gchar *  dest,
size_t *  dest_size 
)

◆ kafka_conf_set_prop()

gboolean kafka_conf_set_prop ( rd_kafka_conf_t *  conf,
const gchar *  name,
const gchar *  value 
)

◆ kafka_dd_init()

gboolean kafka_dd_init ( LogPipe *  s)

◆ kafka_dd_query_insert_topic()

rd_kafka_topic_t* kafka_dd_query_insert_topic ( KafkaDestDriver *  self,
const gchar *  name 
)

◆ kafka_dest_worker_calculate_topic()

rd_kafka_topic_t* kafka_dest_worker_calculate_topic ( KafkaDestWorker *  self,
LogMessage *  msg 
)

◆ kafka_dest_worker_calculate_topic_from_template()

rd_kafka_topic_t* kafka_dest_worker_calculate_topic_from_template ( KafkaDestWorker *  self,
LogMessage *  msg 
)

◆ kafka_dest_worker_get_literal_topic()

rd_kafka_topic_t* kafka_dest_worker_get_literal_topic ( KafkaDestWorker *  self)

◆ kafka_dest_worker_resolve_template_topic_name()

const gchar* kafka_dest_worker_resolve_template_topic_name ( KafkaDestWorker *  self,
LogMessage *  msg 
)

◆ kafka_final_flush()

void kafka_final_flush ( KafkaSourceDriver *  self)

◆ kafka_format_partition_key()

gchar* kafka_format_partition_key ( const gchar *  topic,
int32_t  partition,
gchar *  key,
gsize  key_size 
)
inline

◆ kafka_log_callback()

void kafka_log_callback ( const rd_kafka_t *  rkt,
int  level,
const char *  fac,
const char *  msg 
)

◆ kafka_log_partition_list()

void kafka_log_partition_list ( KafkaSourceDriver *  self,
const rd_kafka_topic_partition_list_t *  partitions 
)

◆ kafka_opaque_deinit()

void kafka_opaque_deinit ( KafkaOpaque self)

◆ kafka_opaque_driver()

LogDriver* kafka_opaque_driver ( KafkaOpaque self)
inline

◆ kafka_opaque_init()

void kafka_opaque_init ( KafkaOpaque self,
LogDriver *  driver,
KafkaOptions options 
)

◆ kafka_opaque_state_get()

KafkaConnectedState kafka_opaque_state_get ( KafkaOpaque self)
inline

◆ kafka_opaque_state_get_last_error()

gint kafka_opaque_state_get_last_error ( KafkaOpaque self)
inline

◆ kafka_opaque_state_lock()

void kafka_opaque_state_lock ( KafkaOpaque self)
inline

◆ kafka_opaque_state_set()

void kafka_opaque_state_set ( KafkaOpaque self,
KafkaConnectedState  state 
)
inline

◆ kafka_opaque_state_set_last_error()

void kafka_opaque_state_set_last_error ( KafkaOpaque self,
gint  error 
)
inline

◆ kafka_opaque_state_unlock()

void kafka_opaque_state_unlock ( KafkaOpaque self)
inline

◆ kafka_options_defaults()

void kafka_options_defaults ( KafkaOptions self)

◆ kafka_options_destroy()

void kafka_options_destroy ( KafkaOptions self)

◆ kafka_options_merge_config()

void kafka_options_merge_config ( KafkaOptions self,
GList *  props 
)
inline

◆ kafka_options_set_bootstrap_servers()

void kafka_options_set_bootstrap_servers ( KafkaOptions self,
const gchar *  bootstrap_servers 
)

◆ kafka_options_set_logging()

gboolean kafka_options_set_logging ( KafkaOptions self,
const gchar *  logging 
)

◆ kafka_options_set_poll_timeout()

void kafka_options_set_poll_timeout ( KafkaOptions self,
gint  poll_timeout 
)
inline

◆ kafka_options_set_state_update_timeout()

void kafka_options_set_state_update_timeout ( KafkaOptions self,
gint  state_update_timeout 
)
inline

◆ kafka_register_counters()

void kafka_register_counters ( KafkaSourceDriver *  self,
GHashTable *  stats_table,
const gchar *  label,
const gchar *  label_value,
const gchar **  counter_names,
gint  level 
)

◆ kafka_sd_assignement_invalidated_signaled()

gboolean kafka_sd_assignement_invalidated_signaled ( KafkaSourceDriver *  self)

◆ kafka_sd_drop_queued_messages()

void kafka_sd_drop_queued_messages ( KafkaSourceDriver *  self)

◆ kafka_sd_inc_msg_topic_stats()

void kafka_sd_inc_msg_topic_stats ( KafkaSourceDriver *  self,
const gchar *  topic 
)

◆ kafka_sd_options_defaults()

void kafka_sd_options_defaults ( KafkaSourceOptions *  self,
LogThreadedSourceWorkerOptions worker_options 
)

◆ kafka_sd_options_destroy()

void kafka_sd_options_destroy ( KafkaSourceOptions *  self)

◆ kafka_sd_parallel_processing()

gboolean kafka_sd_parallel_processing ( KafkaSourceDriver *  self)
inline

◆ kafka_sd_persist_add_msg_bookmark()

void kafka_sd_persist_add_msg_bookmark ( KafkaSourceDriver *  self,
AckTracker *  ack_tracker,
const gchar *  msg_topic_name,
int32_t  msg_partition,
int64_t  msg_offset 
)
inline

◆ kafka_sd_persist_all_ready()

gboolean kafka_sd_persist_all_ready ( KafkaSourceDriver *  self)

◆ kafka_sd_persist_is_ready()

gboolean kafka_sd_persist_is_ready ( KafkaSourceDriver *  self,
const gchar *  msg_topic_name,
int32_t  msg_partition 
)

◆ kafka_sd_reassign_signaled()

gboolean kafka_sd_reassign_signaled ( KafkaSourceDriver *  self)

◆ kafka_sd_reopen()

gboolean kafka_sd_reopen ( LogDriver *  s)

◆ kafka_sd_signal_assignement_invalidated()

void kafka_sd_signal_assignement_invalidated ( KafkaSourceDriver *  self)

◆ kafka_sd_signal_queue()

void kafka_sd_signal_queue ( KafkaSourceDriver *  self,
LogThreadedSourceWorker *  worker 
)
inline

◆ kafka_sd_signal_queue_ndx()

void kafka_sd_signal_queue_ndx ( KafkaSourceDriver *  self,
guint  ndx 
)
inline

◆ kafka_sd_signal_queues()

void kafka_sd_signal_queues ( KafkaSourceDriver *  self)
inline

◆ kafka_sd_signal_reassign()

void kafka_sd_signal_reassign ( KafkaSourceDriver *  self)

◆ kafka_sd_store_persist_offset()

gboolean kafka_sd_store_persist_offset ( KafkaSourceDriver *  self,
KafkaSourcePersist *  persist,
int64_t  offset 
)

◆ kafka_sd_update_msg_length_stats()

void kafka_sd_update_msg_length_stats ( KafkaSourceDriver *  self,
gsize  len 
)

◆ kafka_sd_update_msg_worker_stats()

void kafka_sd_update_msg_worker_stats ( KafkaSourceDriver *  self,
gint  worker_ndx 
)

◆ kafka_sd_used_queue_num()

guint kafka_sd_used_queue_num ( KafkaSourceDriver *  self)
inline

◆ kafka_sd_using_queues()

gboolean kafka_sd_using_queues ( KafkaSourceDriver *  self)
inline

◆ kafka_sd_wait_for_queue()

void kafka_sd_wait_for_queue ( KafkaSourceDriver *  self,
LogThreadedSourceWorker *  worker 
)

◆ kafka_sd_wait_for_queue_processors_to_exit()

void kafka_sd_wait_for_queue_processors_to_exit ( KafkaSourceDriver *  self,
const gdouble  iteration_sleep_time 
)

◆ kafka_sd_wait_for_queue_processors_to_sleep()

gboolean kafka_sd_wait_for_queue_processors_to_sleep ( KafkaSourceDriver *  self,
const gdouble  iteration_sleep_time,
gboolean  poll_kafka 
)

◆ kafka_sd_wakeup_kafka_queues()

void kafka_sd_wakeup_kafka_queues ( KafkaSourceDriver *  self)

◆ kafka_sd_worker_queue()

GAsyncQueue* kafka_sd_worker_queue ( KafkaSourceDriver *  self,
LogThreadedSourceWorker *  worker 
)
inline

◆ kafka_sd_worker_queues_len()

guint kafka_sd_worker_queues_len ( KafkaSourceDriver *  self)
inline

◆ kafka_seek_partition()

gboolean kafka_seek_partition ( KafkaSourceDriver *  self,
rd_kafka_topic_partition_t *  partition,
int64_t  offset,
int  timeout_ms 
)

◆ kafka_seek_partitions()

gboolean kafka_seek_partitions ( KafkaSourceDriver *  self,
rd_kafka_topic_partition_list_t *  partitions,
int  timeout_ms 
)

◆ kafka_src_worker_get_name()

const gchar* kafka_src_worker_get_name ( LogThreadedSourceWorker *  worker)
inline

◆ kafka_unregister_counters()

void kafka_unregister_counters ( KafkaSourceDriver *  self,
const gchar *  label,
const gchar *  label_value,
StatsCounterItem counter,
const gchar **  counter_names 
)

◆ kafka_update_state()

rd_kafka_resp_err_t kafka_update_state ( KafkaSourceDriver *  self,
gboolean  lock 
)

◆ kafka_validate_topic_name()

gboolean kafka_validate_topic_name ( const gchar *  name,
GError **  error 
)

◆ kafka_validate_topic_pattern()

gboolean kafka_validate_topic_pattern ( const char *  topic,
GError **  error 
)

◆ topic_name_error_quark()

GQuark topic_name_error_quark ( void  )