syslog-ng source
logqueue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2002-2011 Balabit
3  * Copyright (c) 1998-2011 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGQUEUE_H_INCLUDED
26 #define LOGQUEUE_H_INCLUDED
27 
28 #include "logmsg/logmsg.h"
29 #include "stats/stats-registry.h"
31 
32 typedef void (*LogQueuePushNotifyFunc)(gpointer user_data);
33 
34 typedef struct _LogQueue LogQueue;
35 
36 typedef const char *QueueType;
37 
38 typedef struct _LogQueueMetrics
39 {
40  struct
41  {
42  StatsClusterKey *output_events_sc_key;
43  StatsClusterKey *memory_usage_sc_key;
44 
48  } shared;
49 
50  struct
51  {
52  StatsClusterKey *events_sc_key;
53  StatsClusterKey *memory_usage_sc_key;
54 
55  StatsCounterItem *memory_usage;
56  StatsCounterItem *queued_messages;
57  } owned;
59 
60 struct _LogQueue
61 {
64 
65  gint throttle;
67  struct timespec last_throttle_check;
68 
69  gchar *persist_name;
70 
72  gboolean abandoned;
73 
74  GMutex lock;
78 
79  /* queue management */
80  gboolean (*keep_on_reload)(LogQueue *self);
81  gint64 (*get_length)(LogQueue *self);
82  gboolean (*is_empty_racy)(LogQueue *self);
83  void (*push_tail)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options);
84  LogMessage *(*pop_head)(LogQueue *self, LogPathOptions *path_options);
85  LogMessage *(*peek_head)(LogQueue *self);
86  void (*ack_backlog)(LogQueue *self, gint n);
87  void (*rewind_backlog)(LogQueue *self, guint rewind_count);
88  void (*rewind_backlog_all)(LogQueue *self);
89 
90  void (*free_fn)(LogQueue *self);
91 };
92 
93 static inline gboolean
94 log_queue_keep_on_reload(LogQueue *self)
95 {
96  if (self->keep_on_reload)
97  return self->keep_on_reload(self);
98  return TRUE;
99 }
100 
101 static inline gint64
102 log_queue_get_length(LogQueue *self)
103 {
104  return self->get_length(self);
105 }
106 
107 static inline gboolean
108 log_queue_is_empty_racy(LogQueue *self)
109 {
110  if (self->is_empty_racy)
111  return self->is_empty_racy(self);
112  else
113  return (self->get_length(self) == 0);
114 }
115 
116 static inline void
117 log_queue_push_tail(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
118 {
119  self->push_tail(self, msg, path_options);
120 }
121 
122 static inline LogMessage *
123 log_queue_pop_head(LogQueue *self, LogPathOptions *path_options)
124 {
125  LogMessage *msg = NULL;
126 
127  if (self->throttle && self->throttle_buckets == 0)
128  return NULL;
129 
130  msg = self->pop_head(self, path_options);
131 
132  if (msg && self->throttle_buckets > 0)
133  self->throttle_buckets--;
134 
135  return msg;
136 }
137 
138 static inline LogMessage *
139 log_queue_peek_head(LogQueue *self)
140 {
141  return self->peek_head(self);
142 }
143 
144 static inline LogMessage *
145 log_queue_pop_head_ignore_throttle(LogQueue *self, LogPathOptions *path_options)
146 {
147  return self->pop_head(self, path_options);
148 }
149 
150 static inline void
151 log_queue_rewind_backlog(LogQueue *self, guint rewind_count)
152 {
153  self->rewind_backlog(self, rewind_count);
154 }
155 
156 static inline void
157 log_queue_rewind_backlog_all(LogQueue *self)
158 {
159  self->rewind_backlog_all(self);
160 }
161 
162 static inline void
163 log_queue_ack_backlog(LogQueue *self, guint rewind_count)
164 {
165  self->ack_backlog(self, rewind_count);
166 }
167 
168 static inline LogQueue *
169 log_queue_ref(LogQueue *self)
170 {
171  g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
172 
173  if (self)
174  {
175  g_atomic_counter_inc(&self->ref_cnt);
176  }
177  return self;
178 }
179 
180 static inline void
181 log_queue_unref(LogQueue *self)
182 {
183  g_assert(!self || g_atomic_counter_get(&self->ref_cnt) > 0);
184 
185  if (self && g_atomic_counter_dec_and_test(&self->ref_cnt))
186  {
187  if (self->free_fn)
188  self->free_fn(self);
189  }
190 }
191 
192 static inline void
193 log_queue_set_throttle(LogQueue *self, gint throttle)
194 {
195  self->throttle = throttle;
196  self->throttle_buckets = throttle;
197 }
198 
199 static inline gboolean
200 log_queue_has_type(LogQueue *self, QueueType type)
201 {
202  return g_strcmp0(self->type, type) == 0;
203 }
204 
205 void log_queue_memory_usage_add(LogQueue *self, gsize value);
206 void log_queue_memory_usage_sub(LogQueue *self, gsize value);
207 
208 void log_queue_queued_messages_add(LogQueue *self, gsize value);
209 void log_queue_queued_messages_sub(LogQueue *self, gsize value);
210 void log_queue_queued_messages_inc(LogQueue *self);
211 void log_queue_queued_messages_dec(LogQueue *self);
212 void log_queue_queued_messages_reset(LogQueue *self);
213 
214 void log_queue_dropped_messages_inc(LogQueue *self);
215 
216 void log_queue_push_notify(LogQueue *self);
217 void log_queue_reset_parallel_push(LogQueue *self);
218 void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data,
219  GDestroyNotify user_data_destroy);
220 gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify,
221  gpointer user_data, GDestroyNotify user_data_destroy);
222 
223 void log_queue_mark_as_abandoned(LogQueue *self);
224 
225 void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level,
226  StatsClusterKeyBuilder *driver_sck_builder,
227  StatsClusterKeyBuilder *queue_sck_builder);
228 
229 void log_queue_free_method(LogQueue *self);
230 
231 #endif
const char * QueueType
Definition: logqueue.h:36
void log_queue_push_notify(LogQueue *self)
Definition: logqueue.c:102
void log_queue_queued_messages_dec(LogQueue *self)
Definition: logqueue.c:67
void log_queue_reset_parallel_push(LogQueue *self)
Definition: logqueue.c:126
void log_queue_queued_messages_reset(LogQueue *self)
Definition: logqueue.c:74
void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy)
Definition: logqueue.c:135
void log_queue_memory_usage_add(LogQueue *self, gsize value)
Definition: logqueue.c:32
void log_queue_queued_messages_add(LogQueue *self, gsize value)
Definition: logqueue.c:46
void log_queue_queued_messages_sub(LogQueue *self, gsize value)
Definition: logqueue.c:53
void log_queue_mark_as_abandoned(LogQueue *self)
Definition: logqueue.c:399
void log_queue_free_method(LogQueue *self)
Definition: logqueue.c:418
void(* LogQueuePushNotifyFunc)(gpointer user_data)
Definition: logqueue.h:32
void log_queue_dropped_messages_inc(LogQueue *self)
Definition: logqueue.c:85
void log_queue_memory_usage_sub(LogQueue *self, gsize value)
Definition: logqueue.c:39
void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level, StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
Definition: logqueue.c:405
gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy)
Definition: logqueue.c:152
void log_queue_queued_messages_inc(LogQueue *self)
Definition: logqueue.c:60
#define self
Definition: rcptid.c:38
Definition: atomic.h:31
Definition: logqueue.h:39
StatsClusterKey * output_events_sc_key
Definition: logqueue.h:42
StatsClusterKey * memory_usage_sc_key
Definition: logqueue.h:43
StatsCounterItem * memory_usage
Definition: logqueue.h:47
StatsClusterKey * events_sc_key
Definition: logqueue.h:52
StatsCounterItem * dropped_messages
Definition: logqueue.h:46
StatsCounterItem * queued_messages
Definition: logqueue.h:45
Definition: stats-counter.h:67
Definition: logqueue.h:61
gboolean(* keep_on_reload)(LogQueue *self)
Definition: logqueue.h:80
GAtomicCounter ref_cnt
Definition: logqueue.h:63
struct timespec last_throttle_check
Definition: logqueue.h:67
GMutex lock
Definition: logqueue.h:74
gboolean(* is_empty_racy)(LogQueue *self)
Definition: logqueue.h:82
gint throttle_buckets
Definition: logqueue.h:66
GDestroyNotify parallel_push_data_destroy
Definition: logqueue.h:77
QueueType type
Definition: logqueue.h:62
gint throttle
Definition: logqueue.h:65
gboolean abandoned
Definition: logqueue.h:72
void(* rewind_backlog)(LogQueue *self, guint rewind_count)
Definition: logqueue.h:87
void(* free_fn)(LogQueue *self)
Definition: logqueue.h:90
gpointer parallel_push_data
Definition: logqueue.h:76
LogQueuePushNotifyFunc parallel_push_notify
Definition: logqueue.h:75
void(* ack_backlog)(LogQueue *self, gint n)
Definition: logqueue.h:86
LogQueueMetrics metrics
Definition: logqueue.h:71
void(* push_tail)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
Definition: logqueue.h:83
gchar * persist_name
Definition: logqueue.h:69
void(* rewind_backlog_all)(LogQueue *self)
Definition: logqueue.h:88
gint64(* get_length)(LogQueue *self)
Definition: logqueue.h:81
GString * value
Definition: test_decode.c:28
LogMessage * msg
Definition: test_rename.c:35
LogPathOptions path_options
Definition: test_wildcard_file_reader.c:62