45
45
import jdk .jfr .internal .SecuritySupport ;
46
46
import jdk .jfr .internal .Utils ;
47
47
import jdk .jfr .internal .consumer .EventDirectoryStream ;
48
- import jdk .jfr .internal .consumer .JdkJfrConsumer ;
49
48
50
49
/**
51
50
* A recording stream produces events from the current JVM (Java Virtual
68
67
*/
69
68
public final class RecordingStream implements AutoCloseable , EventStream {
70
69
70
+ final static class ChunkConsumer implements Consumer <Long > {
71
+
72
+ private final Recording recording ;
73
+
74
+ ChunkConsumer (Recording recording ) {
75
+ this .recording = recording ;
76
+ }
77
+
78
+ @ Override
79
+ public void accept (Long endNanos ) {
80
+ Instant t = Utils .epochNanosToInstant (endNanos );
81
+ PlatformRecording p = PrivateAccess .getInstance ().getPlatformRecording (recording );
82
+ p .removeBefore (t );
83
+ }
84
+ }
85
+
71
86
private final Recording recording ;
72
87
private final Instant creationTime ;
73
88
private final EventDirectoryStream directoryStream ;
89
+ private long maxSize ;
90
+ private Duration maxAge ;
74
91
75
92
/**
76
93
* Creates an event stream for the current JVM (Java Virtual Machine).
@@ -247,7 +264,11 @@ public EventSettings disable(Class<? extends Event> eventClass) {
247
264
* state
248
265
*/
249
266
public void setMaxAge (Duration maxAge ) {
250
- recording .setMaxAge (maxAge );
267
+ synchronized (directoryStream ) {
268
+ recording .setMaxAge (maxAge );
269
+ this .maxAge = maxAge ;
270
+ updateOnCompleteHandler ();
271
+ }
251
272
}
252
273
253
274
/**
@@ -270,7 +291,11 @@ public void setMaxAge(Duration maxAge) {
270
291
* @throws IllegalStateException if the recording is in {@code CLOSED} state
271
292
*/
272
293
public void setMaxSize (long maxSize ) {
273
- recording .setMaxSize (maxSize );
294
+ synchronized (directoryStream ) {
295
+ recording .setMaxSize (maxSize );
296
+ this .maxSize = maxSize ;
297
+ updateOnCompleteHandler ();
298
+ }
274
299
}
275
300
276
301
@ Override
@@ -320,6 +345,7 @@ public void onError(Consumer<Throwable> action) {
320
345
321
346
@ Override
322
347
public void close () {
348
+ directoryStream .setChunkCompleteHandler (null );
323
349
recording .close ();
324
350
directoryStream .close ();
325
351
}
@@ -333,6 +359,7 @@ public boolean remove(Object action) {
333
359
public void start () {
334
360
PlatformRecording pr = PrivateAccess .getInstance ().getPlatformRecording (recording );
335
361
long startNanos = pr .start ();
362
+ updateOnCompleteHandler ();
336
363
directoryStream .start (startNanos );
337
364
}
338
365
@@ -363,6 +390,7 @@ public void start() {
363
390
public void startAsync () {
364
391
PlatformRecording pr = PrivateAccess .getInstance ().getPlatformRecording (recording );
365
392
long startNanos = pr .start ();
393
+ updateOnCompleteHandler ();
366
394
directoryStream .startAsync (startNanos );
367
395
}
368
396
@@ -380,4 +408,13 @@ public void awaitTermination() throws InterruptedException {
380
408
public void onMetadata (Consumer <MetadataEvent > action ) {
381
409
directoryStream .onMetadata (action );
382
410
}
411
+
412
+ private void updateOnCompleteHandler () {
413
+ if (maxAge != null || maxSize != 0 ) {
414
+ // User has set a chunk removal policy
415
+ directoryStream .setChunkCompleteHandler (null );
416
+ } else {
417
+ directoryStream .setChunkCompleteHandler (new ChunkConsumer (recording ));
418
+ }
419
+ }
383
420
}
0 commit comments