25 #ifndef LOGQUEUE_H_INCLUDED
26 #define LOGQUEUE_H_INCLUDED
38 typedef struct _LogQueueMetrics
53 StatsClusterKey *memory_usage_sc_key;
84 LogMessage *(*pop_head)(LogQueue *
self, LogPathOptions *
path_options);
85 LogMessage *(*peek_head)(LogQueue *
self);
93 static inline gboolean
94 log_queue_keep_on_reload(LogQueue *
self)
96 if (
self->keep_on_reload)
97 return self->keep_on_reload(
self);
102 log_queue_get_length(LogQueue *
self)
104 return self->get_length(
self);
107 static inline gboolean
108 log_queue_is_empty_racy(LogQueue *
self)
110 if (
self->is_empty_racy)
111 return self->is_empty_racy(
self);
113 return (
self->get_length(
self) == 0);
117 log_queue_push_tail(LogQueue *
self, LogMessage *
msg,
const LogPathOptions *
path_options)
122 static inline LogMessage *
123 log_queue_pop_head(LogQueue *
self, LogPathOptions *
path_options)
125 LogMessage *
msg = NULL;
127 if (
self->throttle &&
self->throttle_buckets == 0)
132 if (
msg &&
self->throttle_buckets > 0)
133 self->throttle_buckets--;
138 static inline LogMessage *
139 log_queue_peek_head(LogQueue *
self)
141 return self->peek_head(
self);
144 static inline LogMessage *
145 log_queue_pop_head_ignore_throttle(LogQueue *
self, LogPathOptions *
path_options)
151 log_queue_rewind_backlog(LogQueue *
self, guint rewind_count)
153 self->rewind_backlog(
self, rewind_count);
157 log_queue_rewind_backlog_all(LogQueue *
self)
159 self->rewind_backlog_all(
self);
163 log_queue_ack_backlog(LogQueue *
self, guint rewind_count)
165 self->ack_backlog(
self, rewind_count);
168 static inline LogQueue *
169 log_queue_ref(LogQueue *
self)
171 g_assert(!
self || g_atomic_counter_get(&
self->ref_cnt) > 0);
175 g_atomic_counter_inc(&
self->ref_cnt);
181 log_queue_unref(LogQueue *
self)
183 g_assert(!
self || g_atomic_counter_get(&
self->ref_cnt) > 0);
185 if (
self && g_atomic_counter_dec_and_test(&
self->ref_cnt))
193 log_queue_set_throttle(LogQueue *
self, gint throttle)
195 self->throttle = throttle;
196 self->throttle_buckets = throttle;
199 static inline gboolean
200 log_queue_has_type(LogQueue *
self,
QueueType type)
202 return g_strcmp0(
self->type, type) == 0;
219 GDestroyNotify user_data_destroy);
221 gpointer user_data, GDestroyNotify user_data_destroy);
226 StatsClusterKeyBuilder *driver_sck_builder,
227 StatsClusterKeyBuilder *queue_sck_builder);
const char * QueueType
Definition: logqueue.h:36
void log_queue_push_notify(LogQueue *self)
Definition: logqueue.c:102
void log_queue_queued_messages_dec(LogQueue *self)
Definition: logqueue.c:67
void log_queue_reset_parallel_push(LogQueue *self)
Definition: logqueue.c:126
void log_queue_queued_messages_reset(LogQueue *self)
Definition: logqueue.c:74
void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy)
Definition: logqueue.c:135
void log_queue_memory_usage_add(LogQueue *self, gsize value)
Definition: logqueue.c:32
void log_queue_queued_messages_add(LogQueue *self, gsize value)
Definition: logqueue.c:46
void log_queue_queued_messages_sub(LogQueue *self, gsize value)
Definition: logqueue.c:53
void log_queue_mark_as_abandoned(LogQueue *self)
Definition: logqueue.c:399
void log_queue_free_method(LogQueue *self)
Definition: logqueue.c:418
void(* LogQueuePushNotifyFunc)(gpointer user_data)
Definition: logqueue.h:32
void log_queue_dropped_messages_inc(LogQueue *self)
Definition: logqueue.c:85
void log_queue_memory_usage_sub(LogQueue *self, gsize value)
Definition: logqueue.c:39
void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level, StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
Definition: logqueue.c:405
gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy)
Definition: logqueue.c:152
void log_queue_queued_messages_inc(LogQueue *self)
Definition: logqueue.c:60
#define self
Definition: rcptid.c:38
Definition: logqueue.h:39
StatsClusterKey * output_events_sc_key
Definition: logqueue.h:42
StatsClusterKey * memory_usage_sc_key
Definition: logqueue.h:43
StatsCounterItem * memory_usage
Definition: logqueue.h:47
StatsClusterKey * events_sc_key
Definition: logqueue.h:52
StatsCounterItem * dropped_messages
Definition: logqueue.h:46
StatsCounterItem * queued_messages
Definition: logqueue.h:45
Definition: stats-counter.h:67
Definition: logqueue.h:61
gboolean(* keep_on_reload)(LogQueue *self)
Definition: logqueue.h:80
GAtomicCounter ref_cnt
Definition: logqueue.h:63
struct timespec last_throttle_check
Definition: logqueue.h:67
GMutex lock
Definition: logqueue.h:74
gboolean(* is_empty_racy)(LogQueue *self)
Definition: logqueue.h:82
gint throttle_buckets
Definition: logqueue.h:66
GDestroyNotify parallel_push_data_destroy
Definition: logqueue.h:77
QueueType type
Definition: logqueue.h:62
gint throttle
Definition: logqueue.h:65
gboolean abandoned
Definition: logqueue.h:72
void(* rewind_backlog)(LogQueue *self, guint rewind_count)
Definition: logqueue.h:87
void(* free_fn)(LogQueue *self)
Definition: logqueue.h:90
gpointer parallel_push_data
Definition: logqueue.h:76
LogQueuePushNotifyFunc parallel_push_notify
Definition: logqueue.h:75
void(* ack_backlog)(LogQueue *self, gint n)
Definition: logqueue.h:86
LogQueueMetrics metrics
Definition: logqueue.h:71
void(* push_tail)(LogQueue *self, LogMessage *msg, const LogPathOptions *path_options)
Definition: logqueue.h:83
gchar * persist_name
Definition: logqueue.h:69
void(* rewind_backlog_all)(LogQueue *self)
Definition: logqueue.h:88
gint64(* get_length)(LogQueue *self)
Definition: logqueue.h:81
GString * value
Definition: test_decode.c:28
LogMessage * msg
Definition: test_rename.c:35
LogPathOptions path_options
Definition: test_wildcard_file_reader.c:62