@@ -41,7 +41,7 @@ class AsyncLogWriter::AsyncLogLocker : public StackObj {
41
41
};
42
42
43
43
void AsyncLogWriter::enqueue_locked (const AsyncLogMessage& msg) {
44
- if (_buffer.size () >= _buffer_max_size) {
44
+ if (_buffer.size () >= _buffer_max_size) {
45
45
bool p_created;
46
46
uint32_t * counter = _stats.add_if_absent (msg.output (), 0 , &p_created);
47
47
*counter = *counter + 1 ;
@@ -50,13 +50,12 @@ void AsyncLogWriter::enqueue_locked(const AsyncLogMessage& msg) {
50
50
return ;
51
51
}
52
52
53
- assert (_buffer.size () < _buffer_max_size, " _buffer is over-sized." );
54
53
_buffer.push_back (msg);
55
54
_sem.signal ();
56
55
}
57
56
58
57
void AsyncLogWriter::enqueue (LogFileOutput& output, const LogDecorations& decorations, const char * msg) {
59
- AsyncLogMessage m (output, decorations, os::strdup (msg));
58
+ AsyncLogMessage m (& output, decorations, os::strdup (msg));
60
59
61
60
{ // critical area
62
61
AsyncLogLocker locker;
@@ -70,13 +69,13 @@ void AsyncLogWriter::enqueue(LogFileOutput& output, LogMessageBuffer::Iterator m
70
69
AsyncLogLocker locker;
71
70
72
71
for (; !msg_iterator.is_at_end (); msg_iterator++) {
73
- AsyncLogMessage m (output, msg_iterator.decorations (), os::strdup (msg_iterator.message ()));
72
+ AsyncLogMessage m (& output, msg_iterator.decorations (), os::strdup (msg_iterator.message ()));
74
73
enqueue_locked (m);
75
74
}
76
75
}
77
76
78
77
AsyncLogWriter::AsyncLogWriter ()
79
- : _lock(1 ), _sem(0 ), _io_sem( 1 ),
78
+ : _lock(1 ), _sem(0 ), _flush_sem( 0 ),
80
79
_initialized(false ),
81
80
_stats(17 /* table_size*/ ) {
82
81
if (os::create_thread (this , os::asynclog_thread)) {
@@ -98,10 +97,10 @@ class AsyncLogMapIterator {
98
97
using none = LogTagSetMapping<LogTag::__NO_TAG>;
99
98
100
99
if (*counter > 0 ) {
101
- LogDecorations decorations (LogLevel::Warning, none::tagset (), output-> decorators () );
100
+ LogDecorations decorations (LogLevel::Warning, none::tagset (), LogDecorators::All );
102
101
stringStream ss;
103
102
ss.print (UINT32_FORMAT_W (6 ) " messages dropped due to async logging" , *counter);
104
- AsyncLogMessage msg (* output, decorations, ss.as_string (true /* c_heap*/ ));
103
+ AsyncLogMessage msg (output, decorations, ss.as_string (true /* c_heap*/ ));
105
104
_logs.push_back (msg);
106
105
*counter = 0 ;
107
106
}
@@ -118,7 +117,6 @@ void AsyncLogWriter::write() {
118
117
// The operation 'pop_all()' is done in O(1). All I/O jobs are then performed without
119
118
// lock protection. This guarantees I/O jobs don't block logsites.
120
119
AsyncLogBuffer logs;
121
- bool own_io = false ;
122
120
123
121
{ // critical region
124
122
AsyncLogLocker locker;
@@ -127,24 +125,29 @@ void AsyncLogWriter::write() {
127
125
// append meta-messages of dropped counters
128
126
AsyncLogMapIterator dropped_counters_iter (logs);
129
127
_stats.iterate(&dropped_counters_iter);
130
- own_io = _io_sem.trywait ();
131
128
}
132
129
133
130
LinkedListIterator<AsyncLogMessage> it (logs.head ());
134
- if (!own_io) {
135
- _io_sem.wait ();
136
- }
137
131
132
+ int req = 0 ;
138
133
while (!it.is_empty ()) {
139
134
AsyncLogMessage* e = it.next ();
140
135
char * msg = e->message ();
141
136
142
137
if (msg != nullptr ) {
143
138
e->output ()->write_blocking (e->decorations (), msg);
144
139
os::free (msg);
140
+ } else if (e->output () == nullptr ) {
141
+ // This is a flush token. Record that we found it and then
142
+ // signal the flushing thread after the loop.
143
+ req++;
145
144
}
146
145
}
147
- _io_sem.signal ();
146
+
147
+ if (req > 0 ) {
148
+ assert (req == 1 , " AsyncLogWriter::flush() is NOT MT-safe!" );
149
+ _flush_sem.signal (req);
150
+ }
148
151
}
149
152
150
153
void AsyncLogWriter::run () {
@@ -181,10 +184,23 @@ AsyncLogWriter* AsyncLogWriter::instance() {
181
184
return _instance;
182
185
}
183
186
184
- // write() acquires and releases _io_sem even _buffer is empty.
185
- // This guarantees all logging I/O of dequeued messages are done when it returns.
187
+ // Inserts a flush token into the async output buffer and waits until the AsyncLog thread
188
+ // signals that it has seen it and completed all dequeued message processing.
189
+ // This method is not MT-safe in itself, but is guarded by another lock in the usual
190
+ // usecase - see the comments in the header file for more details.
186
191
void AsyncLogWriter::flush () {
187
192
if (_instance != nullptr ) {
188
- _instance->write ();
193
+ {
194
+ using none = LogTagSetMapping<LogTag::__NO_TAG>;
195
+ AsyncLogLocker locker;
196
+ LogDecorations d (LogLevel::Off, none::tagset (), LogDecorators::None);
197
+ AsyncLogMessage token (nullptr , d, nullptr );
198
+
199
+ // Push directly in-case we are at logical max capacity, as this must not get dropped.
200
+ _instance->_buffer .push_back (token);
201
+ _instance->_sem .signal ();
202
+ }
203
+
204
+ _instance->_flush_sem .wait ();
189
205
}
190
206
}
0 commit comments