syslog-ng source
logthrdestdrv.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013, 2014 Balabit
3  * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGTHRDESTDRV_H
26 #define LOGTHRDESTDRV_H
27 
28 #include "syslog-ng.h"
29 #include "driver.h"
30 #include "stats/stats-registry.h"
32 #include "stats/stats-compat.h"
34 #include "logqueue.h"
35 #include "seqnum.h"
37 #include "timeutils/misc.h"
38 #include "template/templates.h"
39 
40 #include <iv.h>
41 #include <iv_event.h>
42 
43 typedef enum
44 {
45  /* flush modes */
46 
47  /* flush the infligh messages */
49 
50  /* expedite flush, to be used at reload, when the persistency of the queue
51  * contents is ensured */
54 
55 typedef enum
56 {
64  LTR_MAX
66 
67 enum
68 {
69  LTDF_SEQNUM_ALL = 0x0001,
70  LTDF_SEQNUM = 0x0002,
71  /* NOTE: everything >= 0x1000 is driver specific */
72 };
73 
74 typedef struct _LogThreadedDestDriver LogThreadedDestDriver;
75 typedef struct _LogThreadedDestWorker LogThreadedDestWorker;
76 
78 {
79  MainLoopThreadedWorker thread;
80  LogQueue *queue;
81  struct iv_task do_work;
82  struct iv_event wake_up_event;
83  struct iv_event shutdown_event;
84  struct iv_timer timer_reopen;
85  struct iv_timer timer_throttle;
86  struct iv_timer timer_flush;
87 
88  LogThreadedDestDriver *owner;
89 
91  gboolean connected;
92  gint batch_size;
96  gint32 seq_num;
97  struct timespec last_flush_time;
98  gboolean enable_batching;
99  gboolean suspended;
100  time_t time_reopen;
101 
102  struct
103  {
104  GString *last_key;
106 
107  struct
108  {
109  StatsClusterKey *output_event_bytes_sc_key;
110  StatsClusterKey *output_unreachable_key;
111  StatsClusterKey *message_delay_sample_key;
113 
118 
121 
122  gboolean (*init)(LogThreadedDestWorker *s);
123  void (*deinit)(LogThreadedDestWorker *s);
124  gboolean (*connect)(LogThreadedDestWorker *s);
125  void (*disconnect)(LogThreadedDestWorker *s);
126  LogThreadedResult (*insert)(LogThreadedDestWorker *s, LogMessage *msg);
127  LogThreadedResult (*flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode);
128  void (*free_fn)(LogThreadedDestWorker *s);
129 };
130 
132 
134 {
135  LogDestDriver super;
136 
137  struct
138  {
139  StatsClusterKey *output_events_sc_key;
140  StatsClusterKey *processed_sc_key;
141  StatsClusterKey *output_event_retries_sc_key;
142 
147 
148  StatsAggregator *max_message_size;
149  StatsAggregator *average_messages_size;
150  StatsAggregator *max_batch_size;
151  StatsAggregator *average_batch_size;
152  StatsAggregator *CPS;
154 
158  time_t time_reopen;
160  guint retries_max;
161 
162  struct
163  {
164  LogThreadedDestWorker *(*construct)(LogThreadedDestDriver *s, gint worker_index);
165 
166  /* this is a compatibility layer that can be removed once all drivers have
167  * been migrated to the use of LogThreadedDestWorker based interface.
168  * Right now, if a driver is not overriding the Worker instance, we would
169  * be calling these methods from the functions named `_compat_*()`. */
170  LogThreadedDestWorker instance;
171  void (*thread_init)(LogThreadedDestDriver *s);
172  void (*thread_deinit)(LogThreadedDestDriver *s);
173  gboolean (*connect)(LogThreadedDestDriver *s);
174  void (*disconnect)(LogThreadedDestDriver *s);
175  LogThreadedResult (*insert)(LogThreadedDestDriver *s, LogMessage *msg);
176  LogThreadedResult (*flush)(LogThreadedDestDriver *s);
178 
179  LogThreadedDestWorker **workers;
182  guint last_worker;
183 
185  LogTemplate *worker_partition_key;
187 
188  /* this counter is not thread safe if there are multiple worker threads,
189  * in that case, one needs to use LogThreadedDestWorker->seq_num, which is
190  * static for a single insert() invocation, whereas this might be
191  * increased in parallel by the multiple threads. */
192 
194  guint32 flags;
195 
196  const gchar *(*format_stats_key)(LogThreadedDestDriver *s, StatsClusterKeyBuilder *kb);
197 };
198 
199 static inline gboolean
200 log_threaded_dest_worker_init(LogThreadedDestWorker *self)
201 {
202  if (self->init)
203  return self->init(self);
204  return TRUE;
205 }
206 
207 static inline void
208 log_threaded_dest_worker_deinit(LogThreadedDestWorker *self)
209 {
210  if (self->deinit)
211  self->deinit(self);
212 }
213 
214 static inline gboolean
215 log_threaded_dest_worker_connect(LogThreadedDestWorker *self)
216 {
217  if (self->connect)
218  self->connected = self->connect(self);
219  else
220  self->connected = TRUE;
221 
222 
223  stats_counter_set(self->metrics.output_unreachable, !self->connected);
224  return self->connected;
225 }
226 
227 static inline void
228 log_threaded_dest_worker_disconnect(LogThreadedDestWorker *self)
229 {
230  if (self->disconnect)
231  self->disconnect(self);
232  self->connected = FALSE;
233  stats_counter_set(self->metrics.output_unreachable, !self->connected);
234 }
235 
236 static inline LogThreadedResult
237 log_threaded_dest_worker_insert(LogThreadedDestWorker *self, LogMessage *msg)
238 {
239  if ((self->owner->flags & LTDF_SEQNUM) &&
240  ((self->owner->flags & LTDF_SEQNUM_ALL) || (msg->flags & LF_LOCAL)))
241  {
242  if (self->owner->num_workers > 1)
243  self->seq_num = step_sequence_number_atomic(&self->owner->shared_seq_num);
244  else
245  self->seq_num = step_sequence_number(&self->owner->shared_seq_num);
246  }
247  else
248  self->seq_num = 0;
249 
250  LogThreadedResult result = self->insert(self, msg);
251 
252  if (self->metrics.message_delay_sample
254  {
255  UnixTime now;
256 
257  unix_time_set_now(&now);
258  gint64 diff_msec = unix_time_diff_in_msec(&now, &msg->timestamps[LM_TS_RECVD]);
259 
260  if (self->metrics.last_delay_update != now.ut_sec)
261  {
262  stats_counter_set_time(self->metrics.message_delay_sample, diff_msec);
263  stats_counter_set_time(self->metrics.message_delay_sample_age, now.ut_sec);
264  self->metrics.last_delay_update = now.ut_sec;
265  }
266  }
267 
268  return result;
269 }
270 
271 static inline LogThreadedResult
272 log_threaded_dest_worker_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
273 {
275 
276  if (self->flush)
277  result = self->flush(self, mode);
278  iv_validate_now();
279  self->last_flush_time = iv_now;
280  return result;
281 }
282 
283 /* function for drivers that are not yet using the worker API */
284 static inline LogThreadedResult
285 log_threaded_dest_driver_flush(LogThreadedDestDriver *self)
286 {
287  return log_threaded_dest_worker_flush(&self->worker.instance, LTF_FLUSH_NORMAL);
288 }
289 
290 void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size);
291 void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size);
292 void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size);
293 void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self);
294 gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self);
295 void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self);
296 void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self,
297  LogThreadedDestDriver *owner,
298  gint worker_index);
299 void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self);
300 void log_threaded_dest_worker_free(LogThreadedDestWorker *self);
301 
302 void log_threaded_dest_worker_written_bytes_add(LogThreadedDestWorker *self, gsize b);
303 void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len);
304 void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len);
305 void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self);
306 void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self);
307 
308 gboolean log_threaded_dest_driver_deinit_method(LogPipe *s);
309 gboolean log_threaded_dest_driver_init_method(LogPipe *s);
310 gboolean log_threaded_dest_driver_start_workers(LogPipe *s);
311 
312 void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg);
313 void log_threaded_dest_driver_free(LogPipe *s);
314 
315 void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries);
316 void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers);
317 void log_threaded_dest_driver_set_worker_partition_key_ref(LogDriver *s, LogTemplate *key);
318 void log_threaded_dest_driver_set_flush_on_worker_key_change(LogDriver *s, gboolean f);
319 void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines);
320 void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout);
321 void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen);
322 gboolean log_threaded_dest_driver_process_flag(LogDriver *driver, const gchar *flag);
323 
324 #endif
GProcessMode mode
Definition: gprocess.c:118
@ LM_TS_RECVD
Definition: logmsg.h:70
@ LF_LOCAL
Definition: logmsg.h:161
void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg)
Definition: logthrdestdrv.c:1493
void log_threaded_dest_driver_free(LogPipe *s)
Definition: logthrdestdrv.c:1483
void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:976
void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:112
void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries)
Definition: logthrdestdrv.c:1137
gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:964
void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self)
Definition: logthrdestdrv.c:1240
void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self)
Definition: logthrdestdrv.c:1202
LogThreadedResult
Definition: logthrdestdrv.h:56
@ LTR_EXPLICIT_ACK_MGMT
Definition: logthrdestdrv.h:59
@ LTR_DROP
Definition: logthrdestdrv.h:57
@ LTR_RETRY
Definition: logthrdestdrv.h:63
@ LTR_MAX
Definition: logthrdestdrv.h:64
@ LTR_SUCCESS
Definition: logthrdestdrv.h:60
@ LTR_NOT_CONNECTED
Definition: logthrdestdrv.h:62
@ LTR_QUEUED
Definition: logthrdestdrv.h:61
@ LTR_ERROR
Definition: logthrdestdrv.h:58
void log_threaded_dest_worker_free(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:1014
void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen)
Definition: logthrdestdrv.c:75
void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:121
gboolean log_threaded_dest_driver_start_workers(LogPipe *s)
Definition: logthrdestdrv.c:1434
void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:983
gboolean log_threaded_dest_driver_deinit_method(LogPipe *s)
Definition: logthrdestdrv.c:1463
void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines)
Definition: logthrdestdrv.c:59
gboolean log_threaded_dest_driver_process_flag(LogDriver *driver, const gchar *flag)
Definition: logthrdestdrv.c:93
void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self, LogThreadedDestDriver *owner, gint worker_index)
Definition: logthrdestdrv.c:991
void log_threaded_dest_driver_set_flush_on_worker_key_change(LogDriver *s, gboolean f)
Definition: logthrdestdrv.c:1041
const gchar * log_threaded_result_to_str(LogThreadedResult self)
Definition: logthrdestdrv.c:40
gboolean log_threaded_dest_driver_init_method(LogPipe *s)
Definition: logthrdestdrv.c:1387
void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size)
Definition: logthrdestdrv.c:103
@ LTDF_SEQNUM_ALL
Definition: logthrdestdrv.h:69
@ LTDF_SEQNUM
Definition: logthrdestdrv.h:70
void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers)
Definition: logthrdestdrv.c:1024
void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len)
Definition: logthrdestdrv.c:1188
void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self)
Definition: logthrdestdrv.c:663
void log_threaded_dest_worker_written_bytes_add(LogThreadedDestWorker *self, gsize b)
Definition: logthrdestdrv.c:1182
void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout)
Definition: logthrdestdrv.c:67
LogThreadedFlushMode
Definition: logthrdestdrv.h:44
@ LTF_FLUSH_EXPEDITE
Definition: logthrdestdrv.h:52
@ LTF_FLUSH_NORMAL
Definition: logthrdestdrv.h:48
void log_threaded_dest_driver_set_worker_partition_key_ref(LogDriver *s, LogTemplate *key)
Definition: logthrdestdrv.c:1032
void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len)
Definition: logthrdestdrv.c:1195
#define self
Definition: rcptid.c:38
Definition: stats-compat.h:55
Definition: stats-counter.h:67
Definition: logthrdestdrv.h:134
void(* disconnect)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:174
struct _LogThreadedDestDriver::@59 metrics
gboolean flush_on_key_change
Definition: logthrdestdrv.h:184
StatsClusterKey * output_events_sc_key
Definition: logthrdestdrv.h:139
void(* thread_deinit)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:172
StatsAggregator * CPS
Definition: logthrdestdrv.h:152
guint32 flags
Definition: logthrdestdrv.h:194
gint batch_timeout
Definition: logthrdestdrv.h:156
gint batch_lines
Definition: logthrdestdrv.h:155
StatsAggregator * average_batch_size
Definition: logthrdestdrv.h:151
gint num_workers
Definition: logthrdestdrv.h:180
LogThreadedResult(* insert)(LogThreadedDestDriver *s, LogMessage *msg)
Definition: logthrdestdrv.h:175
time_t time_reopen
Definition: logthrdestdrv.h:158
gint created_workers
Definition: logthrdestdrv.h:181
LogThreadedDestWorker instance
Definition: logthrdestdrv.h:170
StatsCounterItem * processed_messages
Definition: logthrdestdrv.h:144
StatsAggregator * max_message_size
Definition: logthrdestdrv.h:148
StatsAggregator * max_batch_size
Definition: logthrdestdrv.h:150
LogDestDriver super
Definition: logthrdestdrv.h:135
LogTemplate * worker_partition_key
Definition: logthrdestdrv.h:185
StatsAggregator * average_messages_size
Definition: logthrdestdrv.h:149
guint last_worker
Definition: logthrdestdrv.h:182
gboolean under_termination
Definition: logthrdestdrv.h:157
LogThreadedDestWorker ** workers
Definition: logthrdestdrv.h:179
struct _LogThreadedDestDriver::@60 worker
void(* thread_init)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:171
StatsCounterItem * written_messages
Definition: logthrdestdrv.h:145
StatsClusterKey * processed_sc_key
Definition: logthrdestdrv.h:140
gint stats_source
Definition: logthrdestdrv.h:186
StatsCounterItem * output_event_retries
Definition: logthrdestdrv.h:146
StatsClusterKey * output_event_retries_sc_key
Definition: logthrdestdrv.h:141
StatsCounterItem * dropped_messages
Definition: logthrdestdrv.h:143
gint32 shared_seq_num
Definition: logthrdestdrv.h:193
LogThreadedResult(* flush)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:176
gboolean(* connect)(LogThreadedDestDriver *s)
Definition: logthrdestdrv.h:173
gint retries_on_error_max
Definition: logthrdestdrv.h:159
guint retries_max
Definition: logthrdestdrv.h:160
Definition: logthrdestdrv.h:78
struct _LogThreadedDestWorker::@58 metrics
StatsCounterItem * message_delay_sample_age
Definition: logthrdestdrv.h:117
struct iv_task do_work
Definition: logthrdestdrv.h:81
gboolean(* init)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:122
LogThreadedResult(* flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode)
Definition: logthrdestdrv.h:127
struct iv_timer timer_reopen
Definition: logthrdestdrv.h:84
struct iv_event wake_up_event
Definition: logthrdestdrv.h:82
gint worker_index
Definition: logthrdestdrv.h:90
void(* disconnect)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:125
gboolean connected
Definition: logthrdestdrv.h:91
GString * last_key
Definition: logthrdestdrv.h:104
LogThreadedResult(* insert)(LogThreadedDestWorker *s, LogMessage *msg)
Definition: logthrdestdrv.h:126
struct iv_timer timer_throttle
Definition: logthrdestdrv.h:85
void(* deinit)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:123
gint32 seq_num
Definition: logthrdestdrv.h:96
StatsClusterKey * output_event_bytes_sc_key
Definition: logthrdestdrv.h:109
gint64 last_delay_update
Definition: logthrdestdrv.h:119
struct _LogThreadedDestWorker::@57 partitioning
gboolean suspended
Definition: logthrdestdrv.h:99
LogQueue * queue
Definition: logthrdestdrv.h:80
struct iv_event shutdown_event
Definition: logthrdestdrv.h:83
gint rewound_batch_size
Definition: logthrdestdrv.h:93
StatsByteCounter written_bytes
Definition: logthrdestdrv.h:114
StatsCounterItem * output_unreachable
Definition: logthrdestdrv.h:115
struct timespec last_flush_time
Definition: logthrdestdrv.h:97
LogThreadedDestDriver * owner
Definition: logthrdestdrv.h:88
struct iv_timer timer_flush
Definition: logthrdestdrv.h:86
guint retries_counter
Definition: logthrdestdrv.h:95
gint batch_size
Definition: logthrdestdrv.h:92
StatsClusterKey * message_delay_sample_age_key
Definition: logthrdestdrv.h:112
StatsCounterItem * message_delay_sample
Definition: logthrdestdrv.h:116
StatsClusterKey * output_unreachable_key
Definition: logthrdestdrv.h:110
gint retries_on_error_counter
Definition: logthrdestdrv.h:94
time_t time_reopen
Definition: logthrdestdrv.h:100
MainLoopThreadedWorker thread
Definition: logthrdestdrv.h:79
void(* free_fn)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:128
StatsClusterKey * message_delay_sample_key
Definition: logthrdestdrv.h:111
gboolean(* connect)(LogThreadedDestWorker *s)
Definition: logthrdestdrv.h:124
gboolean enable_batching
Definition: logthrdestdrv.h:98
GlobalConfig * cfg
Definition: test_batched_ack_tracker.c:34
HTTPDestinationDriver * driver
Definition: test_http-signal_slot.c:35
GString * result
Definition: test_lexer_block.c:34
LogMessage * msg
Definition: test_rename.c:35
struct tm key
Definition: cache.c:63
gint64 unix_time_diff_in_msec(const UnixTime *a, const UnixTime *b)
Definition: unixtime.c:329
void unix_time_set_now(UnixTime *self)
Definition: unixtime.c:40