25 #ifndef LOGTHRDESTDRV_H
26 #define LOGTHRDESTDRV_H
122 gboolean (*
init)(LogThreadedDestWorker *s);
123 void (*
deinit)(LogThreadedDestWorker *s);
124 gboolean (*
connect)(LogThreadedDestWorker *s);
164 LogThreadedDestWorker *(*construct)(LogThreadedDestDriver *s, gint worker_index);
173 gboolean (*
connect)(LogThreadedDestDriver *s);
196 const gchar *(*format_stats_key)(LogThreadedDestDriver *s, StatsClusterKeyBuilder *kb);
199 static inline gboolean
200 log_threaded_dest_worker_init(LogThreadedDestWorker *
self)
203 return self->init(
self);
208 log_threaded_dest_worker_deinit(LogThreadedDestWorker *
self)
214 static inline gboolean
215 log_threaded_dest_worker_connect(LogThreadedDestWorker *
self)
218 self->connected =
self->connect(
self);
220 self->connected = TRUE;
223 stats_counter_set(
self->metrics.output_unreachable, !
self->connected);
224 return self->connected;
228 log_threaded_dest_worker_disconnect(LogThreadedDestWorker *
self)
230 if (
self->disconnect)
231 self->disconnect(
self);
232 self->connected = FALSE;
233 stats_counter_set(
self->metrics.output_unreachable, !
self->connected);
237 log_threaded_dest_worker_insert(LogThreadedDestWorker *
self, LogMessage *
msg)
242 if (
self->owner->num_workers > 1)
243 self->seq_num = step_sequence_number_atomic(&
self->owner->shared_seq_num);
245 self->seq_num = step_sequence_number(&
self->owner->shared_seq_num);
252 if (
self->metrics.message_delay_sample
260 if (
self->metrics.last_delay_update != now.ut_sec)
262 stats_counter_set_time(
self->metrics.message_delay_sample, diff_msec);
263 stats_counter_set_time(
self->metrics.message_delay_sample_age, now.ut_sec);
264 self->metrics.last_delay_update = now.ut_sec;
279 self->last_flush_time = iv_now;
285 log_threaded_dest_driver_flush(LogThreadedDestDriver *
self)
297 LogThreadedDestDriver *owner,
GProcessMode mode
Definition: gprocess.c:118
@ LM_TS_RECVD
Definition: logmsg.h:70
@ LF_LOCAL
Definition: logmsg.h:161
void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg)
Definition: logthrdestdrv.c:1493
void log_threaded_dest_driver_free(LogPipe *s)
Definition: logthrdestdrv.c:1483
void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:976
void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:112
void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries)
Definition: logthrdestdrv.c:1137
gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:964
void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self)
Definition: logthrdestdrv.c:1240
void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self)
Definition: logthrdestdrv.c:1202
LogThreadedResult
Definition: logthrdestdrv.h:56
@ LTR_EXPLICIT_ACK_MGMT
Definition: logthrdestdrv.h:59
@ LTR_DROP
Definition: logthrdestdrv.h:57
@ LTR_RETRY
Definition: logthrdestdrv.h:63
@ LTR_MAX
Definition: logthrdestdrv.h:64
@ LTR_SUCCESS
Definition: logthrdestdrv.h:60
@ LTR_NOT_CONNECTED
Definition: logthrdestdrv.h:62
@ LTR_QUEUED
Definition: logthrdestdrv.h:61
@ LTR_ERROR
Definition: logthrdestdrv.h:58
void log_threaded_dest_worker_free(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:1014
void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
Definition: logthrdestdrv.c:75
void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:121
gboolean log_threaded_dest_driver_start_workers(LogPipe *s)
Definition: logthrdestdrv.c:1434
void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:983
gboolean log_threaded_dest_driver_deinit_method(LogPipe *s)
Definition: logthrdestdrv.c:1463
void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines)
Definition: logthrdestdrv.c:59
gboolean log_threaded_dest_driver_process_flag(LogDriver *driver, const gchar *flag)
Definition: logthrdestdrv.c:93
void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self, LogThreadedDestDriver *owner, gint worker_index)
Definition: logthrdestdrv.c:991
void log_threaded_dest_driver_set_flush_on_worker_key_change(LogDriver *s, gboolean f)
Definition: logthrdestdrv.c:1041
const gchar * log_threaded_result_to_str(LogThreadedResult self)
Definition: logthrdestdrv.c:40
gboolean log_threaded_dest_driver_init_method(LogPipe *s)
Definition: logthrdestdrv.c:1387
void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:103
@ LTDF_SEQNUM_ALL
Definition: logthrdestdrv.h:69
@ LTDF_SEQNUM
Definition: logthrdestdrv.h:70
void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers)
Definition: logthrdestdrv.c:1024
void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len)
Definition: logthrdestdrv.c:1188
void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:663
void log_threaded_dest_worker_written_bytes_add(LogThreadedDestWorker *self, gsize b)
Definition: logthrdestdrv.c:1182
void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout)
Definition: logthrdestdrv.c:67
LogThreadedFlushMode
Definition: logthrdestdrv.h:44
@ LTF_FLUSH_EXPEDITE
Definition: logthrdestdrv.h:52
@ LTF_FLUSH_NORMAL
Definition: logthrdestdrv.h:48
void log_threaded_dest_driver_set_worker_partition_key_ref(LogDriver *s, LogTemplate *key)
Definition: logthrdestdrv.c:1032
void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len)
Definition: logthrdestdrv.c:1195
#define self
Definition: rcptid.c:38
Definition: stats-compat.h:55
Definition: stats-counter.h:67
Definition: logthrdestdrv.h:134
void(* disconnect)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:174
struct _LogThreadedDestDriver::@59 metrics
gboolean flush_on_key_change
Definition: logthrdestdrv.h:184
StatsClusterKey * output_events_sc_key
Definition: logthrdestdrv.h:139
void(* thread_deinit)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:172
StatsAggregator * CPS
Definition: logthrdestdrv.h:152
guint32 flags
Definition: logthrdestdrv.h:194
gint batch_timeout
Definition: logthrdestdrv.h:156
gint batch_lines
Definition: logthrdestdrv.h:155
StatsAggregator * average_batch_size
Definition: logthrdestdrv.h:151
gint num_workers
Definition: logthrdestdrv.h:180
LogThreadedResult(* insert)(LogThreadedDestDriver *s, LogMessage *msg)
Definition: logthrdestdrv.h:175
time_t time_reopen
Definition: logthrdestdrv.h:158
gint created_workers
Definition: logthrdestdrv.h:181
LogThreadedDestWorker instance
Definition: logthrdestdrv.h:170
StatsCounterItem * processed_messages
Definition: logthrdestdrv.h:144
StatsAggregator * max_message_size
Definition: logthrdestdrv.h:148
StatsAggregator * max_batch_size
Definition: logthrdestdrv.h:150
LogDestDriver super
Definition: logthrdestdrv.h:135
LogTemplate * worker_partition_key
Definition: logthrdestdrv.h:185
StatsAggregator * average_messages_size
Definition: logthrdestdrv.h:149
guint last_worker
Definition: logthrdestdrv.h:182
gboolean under_termination
Definition: logthrdestdrv.h:157
LogThreadedDestWorker ** workers
Definition: logthrdestdrv.h:179
struct _LogThreadedDestDriver::@60 worker
void(* thread_init)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:171
StatsCounterItem * written_messages
Definition: logthrdestdrv.h:145
StatsClusterKey * processed_sc_key
Definition: logthrdestdrv.h:140
gint stats_source
Definition: logthrdestdrv.h:186
StatsCounterItem * output_event_retries
Definition: logthrdestdrv.h:146
StatsClusterKey * output_event_retries_sc_key
Definition: logthrdestdrv.h:141
StatsCounterItem * dropped_messages
Definition: logthrdestdrv.h:143
gint32 shared_seq_num
Definition: logthrdestdrv.h:193
LogThreadedResult(* flush)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:176
gboolean(* connect)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:173
gint retries_on_error_max
Definition: logthrdestdrv.h:159
guint retries_max
Definition: logthrdestdrv.h:160
Definition: logthrdestdrv.h:78
struct _LogThreadedDestWorker::@58 metrics
StatsCounterItem * message_delay_sample_age
Definition: logthrdestdrv.h:117
struct iv_task do_work
Definition: logthrdestdrv.h:81
gboolean(* init)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:122
LogThreadedResult(* flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode)
Definition: logthrdestdrv.h:127
struct iv_timer timer_reopen
Definition: logthrdestdrv.h:84
struct iv_event wake_up_event
Definition: logthrdestdrv.h:82
gint worker_index
Definition: logthrdestdrv.h:90
void(* disconnect)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:125
gboolean connected
Definition: logthrdestdrv.h:91
GString * last_key
Definition: logthrdestdrv.h:104
LogThreadedResult(* insert)(LogThreadedDestWorker *s, LogMessage *msg)
Definition: logthrdestdrv.h:126
struct iv_timer timer_throttle
Definition: logthrdestdrv.h:85
void(* deinit)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:123
gint32 seq_num
Definition: logthrdestdrv.h:96
StatsClusterKey * output_event_bytes_sc_key
Definition: logthrdestdrv.h:109
gint64 last_delay_update
Definition: logthrdestdrv.h:119
struct _LogThreadedDestWorker::@57 partitioning
gboolean suspended
Definition: logthrdestdrv.h:99
LogQueue * queue
Definition: logthrdestdrv.h:80
struct iv_event shutdown_event
Definition: logthrdestdrv.h:83
gint rewound_batch_size
Definition: logthrdestdrv.h:93
StatsByteCounter written_bytes
Definition: logthrdestdrv.h:114
StatsCounterItem * output_unreachable
Definition: logthrdestdrv.h:115
struct timespec last_flush_time
Definition: logthrdestdrv.h:97
LogThreadedDestDriver * owner
Definition: logthrdestdrv.h:88
struct iv_timer timer_flush
Definition: logthrdestdrv.h:86
guint retries_counter
Definition: logthrdestdrv.h:95
gint batch_size
Definition: logthrdestdrv.h:92
StatsClusterKey * message_delay_sample_age_key
Definition: logthrdestdrv.h:112
StatsCounterItem * message_delay_sample
Definition: logthrdestdrv.h:116
StatsClusterKey * output_unreachable_key
Definition: logthrdestdrv.h:110
gint retries_on_error_counter
Definition: logthrdestdrv.h:94
time_t time_reopen
Definition: logthrdestdrv.h:100
MainLoopThreadedWorker thread
Definition: logthrdestdrv.h:79
void(* free_fn)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:128
StatsClusterKey * message_delay_sample_key
Definition: logthrdestdrv.h:111
gboolean(* connect)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:124
gboolean enable_batching
Definition: logthrdestdrv.h:98
GlobalConfig * cfg
Definition: test_batched_ack_tracker.c:34
HTTPDestinationDriver * driver
Definition: test_http-signal_slot.c:35
GString * result
Definition: test_lexer_block.c:34
LogMessage * msg
Definition: test_rename.c:35
struct tm key
Definition: cache.c:63
gint64 unix_time_diff_in_msec(const UnixTime *a, const UnixTime *b)
Definition: unixtime.c:329
void unix_time_set_now(UnixTime *self)
Definition: unixtime.c:40