@@ -80,7 +80,7 @@ public TestConfig onJobRequest(String token) {
80
80
81
81
@ Override
82
82
public void onResult (String token , TestResult result ) {
83
- sink . add (result );
83
+ vmByToken . get ( token ). recordResult (result );
84
84
}
85
85
}) : null ;
86
86
embeddedExecutor = new EmbeddedExecutor (sink , cpuLayout );
@@ -127,19 +127,10 @@ private List<Integer> acquireCPUs(int cpus) {
127
127
128
128
private void processReadyVMs () {
129
129
for (VM vm : vmByToken .values ()) {
130
- try {
131
- if (!vm .checkTermination ()) continue ;
132
- } catch (ForkFailedException e ) {
133
- TestConfig task = vm .getTask ();
134
- TestResult result = new TestResult (task , Status .VM_ERROR );
135
- for (String i : e .getInfo ()) {
136
- result .addAuxData (i );
137
- }
138
- sink .add (result );
130
+ if (vm .checkCompleted (sink )) {
131
+ vmByToken .remove (vm .token , vm );
132
+ cpuLayout .release (vm .claimedCPUs );
139
133
}
140
-
141
- vmByToken .remove (vm .token , vm );
142
- cpuLayout .release (vm .claimedCPUs );
143
134
}
144
135
}
145
136
@@ -155,6 +146,7 @@ private static class VM {
155
146
private Process process ;
156
147
private boolean processed ;
157
148
private IOException pendingException ;
149
+ private TestResult result ;
158
150
159
151
public VM (String host , int port , String token , TestConfig task , List <Integer > claimedCPUs ) {
160
152
this .host = host ;
@@ -258,52 +250,82 @@ void start() {
258
250
}
259
251
}
260
252
261
- boolean checkTermination () {
253
+ public synchronized TestConfig jobRequest () {
254
+ if (processed ) {
255
+ return null ;
256
+ }
257
+ processed = true ;
258
+ return getTask ();
259
+ }
260
+
261
+ public TestConfig getTask () {
262
+ return task ;
263
+ }
264
+
265
+ public boolean checkCompleted (TestResultCollector sink ) {
266
+ // There is a pending exception that terminated the target VM.
262
267
if (pendingException != null ) {
263
- throw new ForkFailedException (pendingException .getMessage ());
268
+ dumpFailure (sink , Collections .singleton (pendingException .getMessage ()), Collections .emptyList ());
269
+ return true ;
264
270
}
265
271
272
+ // Process is still alive, no need to ask about the status.
266
273
if (process .isAlive ()) {
267
274
return false ;
268
- } else {
269
- // Try to poll the exit code, and fail if it's not zero.
275
+ }
276
+
277
+ // Try to poll the exit code, and fail if it's not zero.
278
+ try {
279
+ int ecode = process .waitFor ();
280
+
281
+ List <String > out = new ArrayList <>();
270
282
try {
271
- int ecode = process .waitFor ();
272
- if (ecode != 0 ) {
273
- List <String > output = new ArrayList <>();
274
- try {
275
- output .addAll (Files .readAllLines (stdout .toPath ()));
276
- } catch (IOException e ) {
277
- output .add ("Failed to read stdout: " + e .getMessage ());
278
- }
279
- try {
280
- output .addAll (Files .readAllLines (stderr .toPath ()));
281
- } catch (IOException e ) {
282
- output .add ("Failed to read stderr: " + e .getMessage ());
283
- }
284
- throw new ForkFailedException (output );
285
- }
286
- } catch (InterruptedException ex ) {
287
- throw new ForkFailedException (ex .getMessage ());
288
- } finally {
289
- // The process is definitely dead, remove the temporary files.
290
- stdout .delete ();
291
- stderr .delete ();
283
+ out .addAll (Files .readAllLines (stdout .toPath ()));
284
+ } catch (IOException e ) {
285
+ out .add ("Failed to read stdout: " + e .getMessage ());
292
286
}
293
- return true ;
287
+
288
+ List <String > err = new ArrayList <>();
289
+ try {
290
+ err .addAll (Files .readAllLines (stderr .toPath ()));
291
+ } catch (IOException e ) {
292
+ err .add ("Failed to read stderr: " + e .getMessage ());
293
+ }
294
+
295
+ if (ecode != 0 ) {
296
+ dumpFailure (sink , out , err );
297
+ } else {
298
+ result .addVMOut (out );
299
+ result .addVMErr (err );
300
+ sink .add (result );
301
+ }
302
+ } catch (InterruptedException ex ) {
303
+ dumpFailure (sink , Collections .singleton (ex .getMessage ()), Collections .emptyList ());
304
+ } finally {
305
+ // The process is definitely dead, remove the temporary files.
306
+ stdout .delete ();
307
+ stderr .delete ();
294
308
}
309
+ return true ;
295
310
}
296
311
297
- public synchronized TestConfig jobRequest () {
298
- if (processed ) {
299
- return null ;
312
+ private void dumpFailure (TestResultCollector sink , Collection <String > out , Collection <String > err ) {
313
+ TestConfig task = getTask ();
314
+ TestResult result = new TestResult (task , Status .VM_ERROR );
315
+ for (String i : out ) {
316
+ result .addMessage (i );
300
317
}
301
- processed = true ;
302
- return getTask ();
318
+ for (String i : err ) {
319
+ result .addMessage (i );
320
+ }
321
+ sink .add (result );
303
322
}
304
323
305
- public TestConfig getTask () {
306
- return task ;
324
+ public void recordResult (TestResult r ) {
325
+ if (result != null ) {
326
+ throw new IllegalStateException ("VM had already published a result." );
327
+ }
328
+ result = r ;
307
329
}
308
330
}
309
331
0 commit comments