1 //
2 // Copyright(c) 2015 Gabi Melman.
3 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
4 //
5
6 // async log helper :
7 // Process logs asynchronously using a back thread.
8 //
9 // If the internal queue of log messages reaches its max size,
10 // then the client call will block until there is more room.
11 //
12 // If the back thread throws during logging, a spdlog::spdlog_ex exception
13 // will be thrown in client's thread when tries to log the next message
14
15 #pragma once
16
17 #include <spdlog/common.h>
18 #include <spdlog/sinks/sink.h>
19 #include <spdlog/details/mpmc_bounded_q.h>
20 #include <spdlog/details/log_msg.h>
21 #include <spdlog/details/os.h>
22 #include <spdlog/formatter.h>
23
24 #include <chrono>
25 #include <exception>
26 #include <functional>
27 #include <memory>
28 #include <string>
29 #include <thread>
30 #include <utility>
31 #include <vector>
32
33 namespace spdlog
34 {
35 namespace details
36 {
37
38 class async_log_helper
39 {
40 // Async msg to move to/from the queue
41 // Movable only. should never be copied
42 enum class async_msg_type
43 {
44 log,
45 flush,
46 terminate
47 };
48 struct async_msg
49 {
50 std::string logger_name;
51 level::level_enum level;
52 log_clock::time_point time;
53 size_t thread_id;
54 std::string txt;
55 async_msg_type msg_type;
56
57 async_msg() = default;
58 ~async_msg() = default;
59
60
async_msgasync_msg61 async_msg(async_msg&& other) SPDLOG_NOEXCEPT:
62 logger_name(std::move(other.logger_name)),
63 level(std::move(other.level)),
64 time(std::move(other.time)),
65 txt(std::move(other.txt)),
66 msg_type(std::move(other.msg_type))
67 {}
68
async_msgasync_msg69 async_msg(async_msg_type m_type) :msg_type(m_type)
70 {}
71
72 async_msg& operator=(async_msg&& other) SPDLOG_NOEXCEPT
73 {
74 logger_name = std::move(other.logger_name);
75 level = other.level;
76 time = std::move(other.time);
77 thread_id = other.thread_id;
78 txt = std::move(other.txt);
79 msg_type = other.msg_type;
80 return *this;
81 }
82
83 // never copy or assign. should only be moved..
84 async_msg(const async_msg&) = delete;
85 async_msg& operator=(async_msg& other) = delete;
86
87 // construct from log_msg
async_msgasync_msg88 async_msg(const details::log_msg& m) :
89 level(m.level),
90 time(m.time),
91 thread_id(m.thread_id),
92 txt(m.raw.data(), m.raw.size()),
93 msg_type(async_msg_type::log)
94 {
95 #ifndef SPDLOG_NO_NAME
96 logger_name = *m.logger_name;
97 #endif
98 }
99
100
101 // copy into log_msg
fill_log_msgasync_msg102 void fill_log_msg(log_msg &msg)
103 {
104 msg.logger_name = &logger_name;
105 msg.level = level;
106 msg.time = time;
107 msg.thread_id = thread_id;
108 msg.raw << txt;
109 }
110 };
111
112 public:
113
114 using item_type = async_msg;
115 using q_type = details::mpmc_bounded_queue<item_type>;
116
117 using clock = std::chrono::steady_clock;
118
119
120 async_log_helper(formatter_ptr formatter,
121 const std::vector<sink_ptr>& sinks,
122 size_t queue_size,
123 const log_err_handler err_handler,
124 const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
125 const std::function<void()>& worker_warmup_cb = nullptr,
126 const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
127 const std::function<void()>& worker_teardown_cb = nullptr);
128
129 void log(const details::log_msg& msg);
130
131 // stop logging and join the back thread
132 ~async_log_helper();
133
134 void set_formatter(formatter_ptr);
135
136 void flush(bool wait_for_q);
137
138
139 private:
140 formatter_ptr _formatter;
141 std::vector<std::shared_ptr<sinks::sink>> _sinks;
142
143 // queue of messages to log
144 q_type _q;
145
146 log_err_handler _err_handler;
147
148 bool _flush_requested;
149
150 bool _terminate_requested;
151
152
153 // overflow policy
154 const async_overflow_policy _overflow_policy;
155
156 // worker thread warmup callback - one can set thread priority, affinity, etc
157 const std::function<void()> _worker_warmup_cb;
158
159 // auto periodic sink flush parameter
160 const std::chrono::milliseconds _flush_interval_ms;
161
162 // worker thread teardown callback
163 const std::function<void()> _worker_teardown_cb;
164
165 // worker thread
166 std::thread _worker_thread;
167
168 void push_msg(async_msg&& new_msg);
169
170 // worker thread main loop
171 void worker_loop();
172
173 // pop next message from the queue and process it. will set the last_pop to the pop time
174 // return false if termination of the queue is required
175 bool process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush);
176
177 void handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush);
178
179 // sleep,yield or return immediatly using the time passed since last message as a hint
180 static void sleep_or_yield(const spdlog::log_clock::time_point& now, const log_clock::time_point& last_op_time);
181
182 // wait until the queue is empty
183 void wait_empty_q();
184
185 };
186 }
187 }
188
189 ///////////////////////////////////////////////////////////////////////////////
190 // async_sink class implementation
191 ///////////////////////////////////////////////////////////////////////////////
async_log_helper(formatter_ptr formatter,const std::vector<sink_ptr> & sinks,size_t queue_size,log_err_handler err_handler,const async_overflow_policy overflow_policy,const std::function<void ()> & worker_warmup_cb,const std::chrono::milliseconds & flush_interval_ms,const std::function<void ()> & worker_teardown_cb)192 inline spdlog::details::async_log_helper::async_log_helper(
193 formatter_ptr formatter,
194 const std::vector<sink_ptr>& sinks,
195 size_t queue_size,
196 log_err_handler err_handler,
197 const async_overflow_policy overflow_policy,
198 const std::function<void()>& worker_warmup_cb,
199 const std::chrono::milliseconds& flush_interval_ms,
200 const std::function<void()>& worker_teardown_cb):
201 _formatter(formatter),
202 _sinks(sinks),
203 _q(queue_size),
204 _err_handler(err_handler),
205 _flush_requested(false),
206 _terminate_requested(false),
207 _overflow_policy(overflow_policy),
208 _worker_warmup_cb(worker_warmup_cb),
209 _flush_interval_ms(flush_interval_ms),
210 _worker_teardown_cb(worker_teardown_cb),
211 _worker_thread(&async_log_helper::worker_loop, this)
212 {}
213
214 // Send to the worker thread termination message(level=off)
215 // and wait for it to finish gracefully
~async_log_helper()216 inline spdlog::details::async_log_helper::~async_log_helper()
217 {
218 try
219 {
220 push_msg(async_msg(async_msg_type::terminate));
221 _worker_thread.join();
222 }
223 catch (...) // don't crash in destructor
224 {}
225 }
226
227
228 //Try to push and block until succeeded (if the policy is not to discard when the queue is full)
log(const details::log_msg & msg)229 inline void spdlog::details::async_log_helper::log(const details::log_msg& msg)
230 {
231 push_msg(async_msg(msg));
232
233
234 }
235
push_msg(details::async_log_helper::async_msg && new_msg)236 inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
237 {
238 if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
239 {
240 auto last_op_time = details::os::now();
241 auto now = last_op_time;
242 do
243 {
244 now = details::os::now();
245 sleep_or_yield(now, last_op_time);
246 }
247 while (!_q.enqueue(std::move(new_msg)));
248 }
249
250 }
251
252 // optionally wait for the queue be empty and request flush from the sinks
flush(bool wait_for_q)253 inline void spdlog::details::async_log_helper::flush(bool wait_for_q)
254 {
255 push_msg(async_msg(async_msg_type::flush));
256 if(wait_for_q)
257 wait_empty_q(); //return only make after the above flush message was processed
258 }
259
worker_loop()260 inline void spdlog::details::async_log_helper::worker_loop()
261 {
262 try
263 {
264 if (_worker_warmup_cb) _worker_warmup_cb();
265 auto last_pop = details::os::now();
266 auto last_flush = last_pop;
267 while(process_next_msg(last_pop, last_flush));
268 if (_worker_teardown_cb) _worker_teardown_cb();
269 }
270 catch (const std::exception &ex)
271 {
272 _err_handler(ex.what());
273 }
274 catch (...)
275 {
276 _err_handler("Unknown exception");
277 }
278 }
279
280 // process next message in the queue
281 // return true if this thread should still be active (while no terminate msg was received)
process_next_msg(log_clock::time_point & last_pop,log_clock::time_point & last_flush)282 inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
283 {
284
285 async_msg incoming_async_msg;
286
287
288 if (_q.dequeue(incoming_async_msg))
289 {
290 last_pop = details::os::now();
291 switch (incoming_async_msg.msg_type)
292 {
293 case async_msg_type::flush:
294 _flush_requested = true;
295 break;
296
297 case async_msg_type::terminate:
298 _flush_requested = true;
299 _terminate_requested = true;
300 break;
301
302 default:
303 log_msg incoming_log_msg;
304 incoming_async_msg.fill_log_msg(incoming_log_msg);
305 _formatter->format(incoming_log_msg);
306 for (auto &s : _sinks)
307 {
308 if(s->should_log( incoming_log_msg.level))
309 {
310 s->log(incoming_log_msg);
311 }
312 }
313 }
314 return true;
315 }
316
317 // Handle empty queue..
318 // This is the only place where the queue can terminate or flush to avoid losing messages already in the queue
319 else
320 {
321 auto now = details::os::now();
322 handle_flush_interval(now, last_flush);
323 sleep_or_yield(now, last_pop);
324 return !_terminate_requested;
325 }
326 }
327
328 // flush all sinks if _flush_interval_ms has expired
handle_flush_interval(log_clock::time_point & now,log_clock::time_point & last_flush)329 inline void spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush)
330 {
331 auto should_flush = _flush_requested || (_flush_interval_ms != std::chrono::milliseconds::zero() && now - last_flush >= _flush_interval_ms);
332 if (should_flush)
333 {
334 for (auto &s : _sinks)
335 s->flush();
336 now = last_flush = details::os::now();
337 _flush_requested = false;
338 }
339 }
340
set_formatter(formatter_ptr msg_formatter)341 inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter)
342 {
343 _formatter = msg_formatter;
344 }
345
346
347 // spin, yield or sleep. use the time passed since last message as a hint
sleep_or_yield(const spdlog::log_clock::time_point & now,const spdlog::log_clock::time_point & last_op_time)348 inline void spdlog::details::async_log_helper::sleep_or_yield(const spdlog::log_clock::time_point& now, const spdlog::log_clock::time_point& last_op_time)
349 {
350 using namespace std::this_thread;
351 using std::chrono::milliseconds;
352 using std::chrono::microseconds;
353
354 auto time_since_op = now - last_op_time;
355
356 // spin upto 50 micros
357 if (time_since_op <= microseconds(50))
358 return;
359
360 // yield upto 150 micros
361 if (time_since_op <= microseconds(100))
362 return yield();
363
364
365 // sleep for 20 ms upto 200 ms
366 if (time_since_op <= milliseconds(200))
367 return sleep_for(milliseconds(20));
368
369 // sleep for 200 ms
370 return sleep_for(milliseconds(200));
371 }
372
373 // wait for the queue to be empty
wait_empty_q()374 inline void spdlog::details::async_log_helper::wait_empty_q()
375 {
376 auto last_op = details::os::now();
377 while (_q.approx_size() > 0)
378 {
379 sleep_or_yield(details::os::now(), last_op);
380 }
381
382 }
383
384
385
386
387
388
389
390
391