#include "logthrsource/logthrfetcherdrv.h"#include "logthrdest/logthrdestdrv.h"#include <librdkafka/rdkafka.h>#include "kafka-source-driver.h"#include "kafka-source-worker.h"#include "kafka-dest-driver.h"#include "kafka-dest-worker.h"#include "kafka-source-persist.h"

Go to the source code of this file.
Classes | |
| struct | KafkaOptions |
| struct | KafkaState |
| struct | KafkaOpaque |
| struct | KafkaSourceOptions |
| struct | KafkaSourceWorker |
| struct | KafkaSourceDriver |
| struct | KafkaDestWorker |
| struct | KafkaDestinationOptions |
| struct | KafkaDestDriver |
Macros | |
| #define | KAFKA_DEEP_TRACE 0 |
| #define | kafka_msg_debug msg_debug |
| #define | kafka_msg_trace msg_trace |
| #define | kafka_msg_deep_trace(msg, ...) |
| #define | MAX_KAFKA_TOPIC_NAME_LEN 249 |
| #define | MAX_KAFKA_PARTITION_KEY_NAME_LEN (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32) /* extra for partition number */ |
| #define | TOPIC_NAME_ERROR topic_name_error_quark() |
Enumerations | |
| enum | KafkaLogging { KFL_DISABLED , KFL_KAFKA_LEVEL , KFL_TRACE_LEVEL , KFL_UNKNOWN } |
| enum | KafkaConnectedState { KFS_CONNECTED , KFS_DISCONNECTED , KFS_UNKNOWN } |
| enum | KafkaTopicError { TOPIC_LENGTH_ZERO , TOPIC_DOT_TWO_DOTS , TOPIC_EXCEEDS_MAX_LENGTH , TOPIC_INVALID_PATTERN } |
| enum | KafkaSrcConsumerStrategy { KSCS_ASSIGN , KSCS_SUBSCRIBE , KSCS_UNDEFINED } |
| enum | KafkaSrcPersistStore { KSPS_LOCAL , KSPS_REMOTE } |
Functions | |
| GQuark | topic_name_error_quark (void) |
| gboolean | kafka_validate_topic_pattern (const char *topic, GError **error) |
| gboolean | kafka_validate_topic_name (const gchar *name, GError **error) |
| gboolean | kafka_conf_get_prop (const rd_kafka_conf_t *conf, const gchar *name, gchar *dest, size_t *dest_size) |
| gboolean | kafka_conf_set_prop (rd_kafka_conf_t *conf, const gchar *name, const gchar *value) |
| gboolean | kafka_apply_config_props (rd_kafka_conf_t *conf, GList *props, gchar **protected_properties, gsize protected_properties_num) |
| gboolean | kafka_seek_partition (KafkaSourceDriver *self, rd_kafka_topic_partition_t *partition, int64_t offset, int timeout_ms) |
| gboolean | kafka_seek_partitions (KafkaSourceDriver *self, rd_kafka_topic_partition_list_t *partitions, int timeout_ms) |
| gchar * | kafka_format_partition_key (const gchar *topic, int32_t partition, gchar *key, gsize key_size) |
| void | kafka_log_partition_list (KafkaSourceDriver *self, const rd_kafka_topic_partition_list_t *partitions) |
| void | kafka_log_callback (const rd_kafka_t *rkt, int level, const char *fac, const char *msg) |
| void | kafka_register_counters (KafkaSourceDriver *self, GHashTable *stats_table, const gchar *label, const gchar *label_value, const gchar **counter_names, gint level) |
| void | kafka_unregister_counters (KafkaSourceDriver *self, const gchar *label, const gchar *label_value, StatsCounterItem *counter, const gchar **counter_names) |
| rd_kafka_resp_err_t | kafka_update_state (KafkaSourceDriver *self, gboolean lock) |
| void | kafka_final_flush (KafkaSourceDriver *self) |
| void | kafka_options_defaults (KafkaOptions *self) |
| void | kafka_options_destroy (KafkaOptions *self) |
| void | kafka_options_merge_config (KafkaOptions *self, GList *props) |
| gboolean | kafka_options_set_logging (KafkaOptions *self, const gchar *logging) |
| void | kafka_options_set_bootstrap_servers (KafkaOptions *self, const gchar *bootstrap_servers) |
| void | kafka_options_set_poll_timeout (KafkaOptions *self, gint poll_timeout) |
| void | kafka_options_set_state_update_timeout (KafkaOptions *self, gint state_update_timeout) |
| void | kafka_opaque_init (KafkaOpaque *self, LogDriver *driver, KafkaOptions *options) |
| void | kafka_opaque_deinit (KafkaOpaque *self) |
| LogDriver * | kafka_opaque_driver (KafkaOpaque *self) |
| void | kafka_opaque_state_lock (KafkaOpaque *self) |
| void | kafka_opaque_state_unlock (KafkaOpaque *self) |
| KafkaConnectedState | kafka_opaque_state_get (KafkaOpaque *self) |
| void | kafka_opaque_state_set (KafkaOpaque *self, KafkaConnectedState state) |
| gint | kafka_opaque_state_get_last_error (KafkaOpaque *self) |
| void | kafka_opaque_state_set_last_error (KafkaOpaque *self, gint error) |
| const gchar * | kafka_src_worker_get_name (LogThreadedSourceWorker *worker) |
| void | kafka_sd_options_defaults (KafkaSourceOptions *self, LogThreadedSourceWorkerOptions *worker_options) |
| void | kafka_sd_options_destroy (KafkaSourceOptions *self) |
| gboolean | kafka_sd_reopen (LogDriver *s) |
| gboolean | kafka_sd_using_queues (KafkaSourceDriver *self) |
| guint | kafka_sd_used_queue_num (KafkaSourceDriver *self) |
| guint | kafka_sd_worker_queues_len (KafkaSourceDriver *self) |
| GAsyncQueue * | kafka_sd_worker_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker) |
| void | kafka_sd_wait_for_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker) |
| void | kafka_sd_signal_queue (KafkaSourceDriver *self, LogThreadedSourceWorker *worker) |
| void | kafka_sd_signal_queue_ndx (KafkaSourceDriver *self, guint ndx) |
| void | kafka_sd_signal_queues (KafkaSourceDriver *self) |
| gboolean | kafka_sd_wait_for_queue_processors_to_sleep (KafkaSourceDriver *self, const gdouble iteration_sleep_time, gboolean poll_kafka) |
| void | kafka_sd_wait_for_queue_processors_to_exit (KafkaSourceDriver *self, const gdouble iteration_sleep_time) |
| void | kafka_sd_drop_queued_messages (KafkaSourceDriver *self) |
| void | kafka_sd_wakeup_kafka_queues (KafkaSourceDriver *self) |
| void | kafka_sd_signal_reassign (KafkaSourceDriver *self) |
| gboolean | kafka_sd_reassign_signaled (KafkaSourceDriver *self) |
| void | kafka_sd_signal_assignement_invalidated (KafkaSourceDriver *self) |
| gboolean | kafka_sd_assignement_invalidated_signaled (KafkaSourceDriver *self) |
| void | kafka_sd_persist_add_msg_bookmark (KafkaSourceDriver *self, AckTracker *ack_tracker, const gchar *msg_topic_name, int32_t msg_partition, int64_t msg_offset) |
| gboolean | kafka_sd_store_persist_offset (KafkaSourceDriver *self, KafkaSourcePersist *persist, int64_t offset) |
| gboolean | kafka_sd_parallel_processing (KafkaSourceDriver *self) |
| gboolean | kafka_sd_persist_all_ready (KafkaSourceDriver *self) |
| gboolean | kafka_sd_persist_is_ready (KafkaSourceDriver *self, const gchar *msg_topic_name, int32_t msg_partition) |
| void | kafka_sd_update_msg_length_stats (KafkaSourceDriver *self, gsize len) |
| void | kafka_sd_inc_msg_topic_stats (KafkaSourceDriver *self, const gchar *topic) |
| void | kafka_sd_update_msg_worker_stats (KafkaSourceDriver *self, gint worker_ndx) |
| const gchar * | kafka_dest_worker_resolve_template_topic_name (KafkaDestWorker *self, LogMessage *msg) |
| rd_kafka_topic_t * | kafka_dest_worker_calculate_topic_from_template (KafkaDestWorker *self, LogMessage *msg) |
| rd_kafka_topic_t * | kafka_dest_worker_get_literal_topic (KafkaDestWorker *self) |
| rd_kafka_topic_t * | kafka_dest_worker_calculate_topic (KafkaDestWorker *self, LogMessage *msg) |
| rd_kafka_topic_t * | kafka_dd_query_insert_topic (KafkaDestDriver *self, const gchar *name) |
| gboolean | kafka_dd_init (LogPipe *s) |
| #define KAFKA_DEEP_TRACE 0 |
| #define kafka_msg_debug msg_debug |
| #define kafka_msg_deep_trace | ( | msg, | |
| ... | |||
| ) |
| #define kafka_msg_trace msg_trace |
| #define MAX_KAFKA_PARTITION_KEY_NAME_LEN (MAX_KAFKA_TOPIC_NAME_LEN + 1 + 32) /* extra for partition number */ |
| #define MAX_KAFKA_TOPIC_NAME_LEN 249 |
| #define TOPIC_NAME_ERROR topic_name_error_quark() |
| enum KafkaConnectedState |
| enum KafkaLogging |
| enum KafkaSrcPersistStore |
| enum KafkaTopicError |
| gboolean kafka_apply_config_props | ( | rd_kafka_conf_t * | conf, |
| GList * | props, | ||
| gchar ** | protected_properties, | ||
| gsize | protected_properties_num | ||
| ) |
| gboolean kafka_conf_get_prop | ( | const rd_kafka_conf_t * | conf, |
| const gchar * | name, | ||
| gchar * | dest, | ||
| size_t * | dest_size | ||
| ) |
| gboolean kafka_conf_set_prop | ( | rd_kafka_conf_t * | conf, |
| const gchar * | name, | ||
| const gchar * | value | ||
| ) |
| gboolean kafka_dd_init | ( | LogPipe * | s | ) |
| rd_kafka_topic_t* kafka_dd_query_insert_topic | ( | KafkaDestDriver * | self, |
| const gchar * | name | ||
| ) |
| rd_kafka_topic_t* kafka_dest_worker_calculate_topic | ( | KafkaDestWorker * | self, |
| LogMessage * | msg | ||
| ) |
| rd_kafka_topic_t* kafka_dest_worker_calculate_topic_from_template | ( | KafkaDestWorker * | self, |
| LogMessage * | msg | ||
| ) |
| rd_kafka_topic_t* kafka_dest_worker_get_literal_topic | ( | KafkaDestWorker * | self | ) |
| const gchar* kafka_dest_worker_resolve_template_topic_name | ( | KafkaDestWorker * | self, |
| LogMessage * | msg | ||
| ) |
| void kafka_final_flush | ( | KafkaSourceDriver * | self | ) |
|
inline |
| void kafka_log_callback | ( | const rd_kafka_t * | rkt, |
| int | level, | ||
| const char * | fac, | ||
| const char * | msg | ||
| ) |
| void kafka_log_partition_list | ( | KafkaSourceDriver * | self, |
| const rd_kafka_topic_partition_list_t * | partitions | ||
| ) |
| void kafka_opaque_deinit | ( | KafkaOpaque * | self | ) |
|
inline |
| void kafka_opaque_init | ( | KafkaOpaque * | self, |
| LogDriver * | driver, | ||
| KafkaOptions * | options | ||
| ) |
|
inline |
|
inline |
|
inline |
|
inline |
|
inline |
|
inline |
| void kafka_options_defaults | ( | KafkaOptions * | self | ) |
| void kafka_options_destroy | ( | KafkaOptions * | self | ) |
|
inline |
| void kafka_options_set_bootstrap_servers | ( | KafkaOptions * | self, |
| const gchar * | bootstrap_servers | ||
| ) |
| gboolean kafka_options_set_logging | ( | KafkaOptions * | self, |
| const gchar * | logging | ||
| ) |
|
inline |
|
inline |
| void kafka_register_counters | ( | KafkaSourceDriver * | self, |
| GHashTable * | stats_table, | ||
| const gchar * | label, | ||
| const gchar * | label_value, | ||
| const gchar ** | counter_names, | ||
| gint | level | ||
| ) |
| gboolean kafka_sd_assignement_invalidated_signaled | ( | KafkaSourceDriver * | self | ) |
| void kafka_sd_drop_queued_messages | ( | KafkaSourceDriver * | self | ) |
| void kafka_sd_inc_msg_topic_stats | ( | KafkaSourceDriver * | self, |
| const gchar * | topic | ||
| ) |
| void kafka_sd_options_defaults | ( | KafkaSourceOptions * | self, |
| LogThreadedSourceWorkerOptions * | worker_options | ||
| ) |
| void kafka_sd_options_destroy | ( | KafkaSourceOptions * | self | ) |
|
inline |
|
inline |
| gboolean kafka_sd_persist_all_ready | ( | KafkaSourceDriver * | self | ) |
| gboolean kafka_sd_persist_is_ready | ( | KafkaSourceDriver * | self, |
| const gchar * | msg_topic_name, | ||
| int32_t | msg_partition | ||
| ) |
| gboolean kafka_sd_reassign_signaled | ( | KafkaSourceDriver * | self | ) |
| gboolean kafka_sd_reopen | ( | LogDriver * | s | ) |
| void kafka_sd_signal_assignement_invalidated | ( | KafkaSourceDriver * | self | ) |
|
inline |
|
inline |
|
inline |
| void kafka_sd_signal_reassign | ( | KafkaSourceDriver * | self | ) |
| gboolean kafka_sd_store_persist_offset | ( | KafkaSourceDriver * | self, |
| KafkaSourcePersist * | persist, | ||
| int64_t | offset | ||
| ) |
| void kafka_sd_update_msg_length_stats | ( | KafkaSourceDriver * | self, |
| gsize | len | ||
| ) |
| void kafka_sd_update_msg_worker_stats | ( | KafkaSourceDriver * | self, |
| gint | worker_ndx | ||
| ) |
|
inline |
|
inline |
| void kafka_sd_wait_for_queue | ( | KafkaSourceDriver * | self, |
| LogThreadedSourceWorker * | worker | ||
| ) |
| void kafka_sd_wait_for_queue_processors_to_exit | ( | KafkaSourceDriver * | self, |
| const gdouble | iteration_sleep_time | ||
| ) |
| gboolean kafka_sd_wait_for_queue_processors_to_sleep | ( | KafkaSourceDriver * | self, |
| const gdouble | iteration_sleep_time, | ||
| gboolean | poll_kafka | ||
| ) |
| void kafka_sd_wakeup_kafka_queues | ( | KafkaSourceDriver * | self | ) |
|
inline |
|
inline |
| gboolean kafka_seek_partition | ( | KafkaSourceDriver * | self, |
| rd_kafka_topic_partition_t * | partition, | ||
| int64_t | offset, | ||
| int | timeout_ms | ||
| ) |
| gboolean kafka_seek_partitions | ( | KafkaSourceDriver * | self, |
| rd_kafka_topic_partition_list_t * | partitions, | ||
| int | timeout_ms | ||
| ) |
|
inline |
| void kafka_unregister_counters | ( | KafkaSourceDriver * | self, |
| const gchar * | label, | ||
| const gchar * | label_value, | ||
| StatsCounterItem * | counter, | ||
| const gchar ** | counter_names | ||
| ) |
| rd_kafka_resp_err_t kafka_update_state | ( | KafkaSourceDriver * | self, |
| gboolean | lock | ||
| ) |
| gboolean kafka_validate_topic_name | ( | const gchar * | name, |
| GError ** | error | ||
| ) |
| gboolean kafka_validate_topic_pattern | ( | const char * | topic, |
| GError ** | error | ||
| ) |
| GQuark topic_name_error_quark | ( | void | ) |