syslog-ng source
logthrsourcedrv.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Balabit
3  * Copyright (c) 2018 László Várady <laszlo.varady@balabit.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGTHRSOURCEDRV_H
26 #define LOGTHRSOURCEDRV_H
27 
28 #include "syslog-ng.h"
29 #include "driver.h"
30 #include "logsource.h"
31 #include "cfg.h"
32 #include "logpipe.h"
33 #include "logmsg/logmsg.h"
34 #include "msg-format.h"
37 
38 typedef struct _LogThreadedSourceDriver LogThreadedSourceDriver;
39 typedef struct _LogThreadedSourceWorker LogThreadedSourceWorker;
40 
41 typedef struct _LogThreadedSourceWorkerOptions
42 {
45  AckTrackerFactory *ack_tracker_factory;
47 
48 typedef struct _WakeupCondition
49 {
50  GMutex lock;
51  GCond cond;
52  gboolean awoken;
54 
56 {
57  LogSource super;
58  MainLoopThreadedWorker thread;
59  LogThreadedSourceDriver *control;
63 
64  gboolean (*thread_init)(LogThreadedSourceWorker *self);
65  void (*thread_deinit)(LogThreadedSourceWorker *self);
66  void (*run)(LogThreadedSourceWorker *self);
67  void (*request_exit)(LogThreadedSourceWorker *self);
68  void (*wakeup)(LogThreadedSourceWorker *self);
69 };
70 
72 {
73  LogSrcDriver super;
75  LogThreadedSourceWorker **workers;
80 
81  void (*format_stats_key)(LogThreadedSourceDriver *self, StatsClusterKeyBuilder *kb);
82  LogThreadedSourceWorker *(*worker_construct)(LogThreadedSourceDriver *self, gint worker_index);
83 };
84 
87  const gchar *group_name, gint num_workers);
89 
90 void log_threaded_source_driver_set_transport_name(LogThreadedSourceDriver *self, const gchar *transport_name);
91 void log_threaded_source_driver_init_instance(LogThreadedSourceDriver *self, GlobalConfig *cfg);
92 gboolean log_threaded_source_driver_init_method(LogPipe *s);
93 gboolean log_threaded_source_driver_deinit_method(LogPipe *s);
95 
96 static inline void
97 log_threaded_source_driver_set_num_workers(LogDriver *s, gint num_workers)
98 {
99  LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
100  self->num_workers = num_workers;
101 }
102 
103 static inline LogSourceOptions *
104 log_threaded_source_driver_get_source_options(LogDriver *s)
105 {
106  LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
107 
108  return &self->worker_options.super;
109 }
110 
111 static inline MsgFormatOptions *
112 log_threaded_source_driver_get_parse_options(LogDriver *s)
113 {
114  LogThreadedSourceDriver *self = (LogThreadedSourceDriver *) s;
115 
116  return &self->worker_options.parse_options;
117 }
118 
119 /* Worker */
120 
121 void log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThreadedSourceDriver *driver,
122  gint worker_index);
123 void log_threaded_source_worker_free(LogPipe *s);
124 
125 void log_threaded_source_worker_close_batch(LogThreadedSourceWorker *self);
126 
127 /* blocking API */
128 void log_threaded_source_worker_blocking_post(LogThreadedSourceWorker *self, LogMessage *msg);
129 
130 /* non-blocking API, use it wisely (thread boundaries); call close_batch() at least before suspending */
131 void log_threaded_source_worker_post(LogThreadedSourceWorker *self, LogMessage *msg);
132 gboolean log_threaded_source_worker_free_to_send(LogThreadedSourceWorker *self);
133 
134 #endif
void log_threaded_source_driver_set_transport_name(LogThreadedSourceDriver *self, const gchar *transport_name)
Definition: logthrsourcedrv.c:461
void log_threaded_source_worker_options_destroy(LogThreadedSourceWorkerOptions *options)
Definition: logthrsourcedrv.c:125
void log_threaded_source_worker_post(LogThreadedSourceWorker *self, LogMessage *msg)
Definition: logthrsourcedrv.c:415
void log_threaded_source_worker_close_batch(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.c:409
void log_threaded_source_worker_blocking_post(LogThreadedSourceWorker *self, LogMessage *msg)
Definition: logthrsourcedrv.c:436
void log_threaded_source_driver_init_instance(LogThreadedSourceDriver *self, GlobalConfig *cfg)
Definition: logthrsourcedrv.c:469
void log_threaded_source_worker_options_defaults(LogThreadedSourceWorkerOptions *options)
Definition: logthrsourcedrv.c:101
gboolean log_threaded_source_driver_init_method(LogPipe *s)
Definition: logthrsourcedrv.c:320
void log_threaded_source_driver_free_method(LogPipe *s)
Definition: logthrsourcedrv.c:350
void log_threaded_source_worker_options_init(LogThreadedSourceWorkerOptions *options, GlobalConfig *cfg, const gchar *group_name, gint num_workers)
Definition: logthrsourcedrv.c:110
void log_threaded_source_worker_free(LogPipe *s)
Definition: logthrsourcedrv.c:216
void log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThreadedSourceDriver *driver, gint worker_index)
Definition: logthrsourcedrv.c:230
gboolean log_threaded_source_driver_deinit_method(LogPipe *s)
Definition: logthrsourcedrv.c:340
gboolean log_threaded_source_worker_free_to_send(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.c:430
Definition: logsource.h:36
Definition: logthrsourcedrv.h:42
LogSourceOptions super
Definition: logthrsourcedrv.h:43
AckTrackerFactory * ack_tracker_factory
Definition: logthrsourcedrv.h:45
MsgFormatOptions parse_options
Definition: logthrsourcedrv.h:44
Definition: msg-format.h:69
Definition: logthrsourcedrv.h:49
gboolean awoken
Definition: logthrsourcedrv.h:52
GCond cond
Definition: logthrsourcedrv.h:51
GMutex lock
Definition: logthrsourcedrv.h:50
Definition: logthrsourcedrv.h:72
LogThreadedSourceWorkerOptions worker_options
Definition: logthrsourcedrv.h:74
LogSrcDriver super
Definition: logthrsourcedrv.h:73
gboolean auto_close_batches
Definition: logthrsourcedrv.h:77
gint num_workers
Definition: logthrsourcedrv.h:76
void(* format_stats_key)(LogThreadedSourceDriver *self, StatsClusterKeyBuilder *kb)
Definition: logthrsourcedrv.h:81
gchar * transport_name
Definition: logthrsourcedrv.h:78
gsize transport_name_len
Definition: logthrsourcedrv.h:79
LogThreadedSourceWorker ** workers
Definition: logthrsourcedrv.h:75
Definition: logthrsourcedrv.h:56
MainLoopThreadedWorker thread
Definition: logthrsourcedrv.h:58
LogThreadedSourceDriver * control
Definition: logthrsourcedrv.h:59
LogSource super
Definition: logthrsourcedrv.h:57
void(* request_exit)(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.h:67
gboolean under_termination
Definition: logthrsourcedrv.h:61
gboolean(* thread_init)(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.h:64
void(* run)(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.h:66
WakeupCondition wakeup_cond
Definition: logthrsourcedrv.h:60
void(* thread_deinit)(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.h:65
void(* wakeup)(LogThreadedSourceWorker *self)
Definition: logthrsourcedrv.h:68
gint worker_index
Definition: logthrsourcedrv.h:62
GlobalConfig * cfg
Definition: test_batched_ack_tracker.c:34
HTTPDestinationDriver * driver
Definition: test_http-signal_slot.c:35
LogMessage * msg
Definition: test_rename.c:35