diff --git a/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java b/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java index 6cc9b428..f72199d2 100644 --- a/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java +++ b/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java @@ -63,7 +63,11 @@ public class TestExecutor { private final Map<Integer, VM> vmByToken; private final Object notifyLock; + private final AtomicInteger jvmsStarting; private final AtomicInteger jvmsRunning; + private final AtomicInteger jvmsFinishing; + + private final ExecutorService supportTasks; public TestExecutor(Verbosity verbosity, TestResultCollector sink, Scheduler scheduler) throws IOException { this.verbosity = verbosity; @@ -85,7 +89,21 @@ public void onResult(int token, TestResult result) { } }); + this.jvmsStarting = new AtomicInteger(); this.jvmsRunning = new AtomicInteger(); + this.jvmsFinishing = new AtomicInteger(); + + this.supportTasks = Executors.newCachedThreadPool(new ThreadFactory() { + private final AtomicInteger id = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("jcstress-vm-support-" + id.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); } private void awaitNotification() { @@ -141,7 +159,7 @@ public void runAll(List<TestConfig> configs) { int token = ID.incrementAndGet(); VM vm = new VM(server.getHost(), server.getPort(), token, cfg, cpuMap); vmByToken.put(token, vm); - vm.start(); + supportTasks.submit(vm::start); } } @@ -158,13 +176,21 @@ public void runAll(List<TestConfig> configs) { } } + supportTasks.shutdown(); + try { + supportTasks.awaitTermination(1, TimeUnit.HOURS); + } catch (InterruptedException e) { + // Do nothing + } + server.terminate(); } private boolean processReadyVMs() { boolean reclaimed = false; for (VM vm : vmByToken.values()) { - if (vm.checkCompleted(sink)) { + if (vm.checkCompleted()) { + supportTasks.submit(() -> vm.finish(sink)); vmByToken.remove(vm.token, vm); scheduler.release(vm.cpuMap); reclaimed = true; @@ -181,10 +207,18 @@ public int getSystemCpus() { return scheduler.getSystemCpus(); } + public int getJVMsStarting() { + return jvmsStarting.get(); + } + public int getJVMsRunning() { return jvmsRunning.get(); } + public int getJVMsFinishing() { + return jvmsFinishing.get(); + } + private class VM { private final String host; private final int port; @@ -196,8 +230,9 @@ private class VM { private boolean processed; private IOException pendingException; private TestResult result; - private InputStreamCollector errCollector; - private InputStreamCollector outCollector; + private Future<List<String>> errs; + private Future<List<String>> outs; + private boolean isStarted; public VM(String host, int port, int token, TestConfig task, CPUMap cpuMap) { this.host = host; @@ -205,13 +240,6 @@ public VM(String host, int port, int token, TestConfig task, CPUMap cpuMap) { this.token = token; this.cpuMap = cpuMap; this.task = task; - if (VMSupport.compilerDirectivesAvailable()) { - try { - generateDirectives(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } } void generateDirectives() throws IOException { @@ -320,7 +348,17 @@ void generateDirectives() throws IOException { pw.close(); } - void start() { + synchronized void start() { + jvmsStarting.incrementAndGet(); + + if (VMSupport.compilerDirectivesAvailable()) { + try { + generateDirectives(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + try { List<String> command = new ArrayList<>(); @@ -354,20 +392,18 @@ void start() { ProcessBuilder pb = new ProcessBuilder(command); process = pb.start(); - jvmsRunning.incrementAndGet(); - // start the stream drainers and read the streams into memory; // makes little sense to write them to files, since we would be // reading them back soon anyway - errCollector = new InputStreamCollector(process.getErrorStream()); - outCollector = new InputStreamCollector(process.getInputStream()); - - errCollector.start(); - outCollector.start(); + errs = supportTasks.submit(new InputStreamCollector(process.getErrorStream())); + outs = supportTasks.submit(new InputStreamCollector(process.getInputStream())); } catch (IOException ex) { pendingException = ex; } + isStarted = true; + jvmsStarting.decrementAndGet(); + jvmsRunning.incrementAndGet(); } public synchronized ForkedTestConfig jobRequest() { @@ -378,13 +414,14 @@ public synchronized ForkedTestConfig jobRequest() { return new ForkedTestConfig(task); } - public synchronized boolean checkCompleted(TestResultCollector sink) { + public synchronized boolean checkCompleted() { + // Not yet started + if (!isStarted) { + return false; + } + // There is a pending exception that terminated the target VM. if (pendingException != null) { - result = new TestResult(Status.VM_ERROR); - result.addMessage(pendingException.getMessage()); - result.setConfig(task); - sink.add(result); return true; } @@ -393,15 +430,30 @@ public synchronized boolean checkCompleted(TestResultCollector sink) { return false; } + return true; + } + + public synchronized void finish(TestResultCollector sink) { + jvmsRunning.decrementAndGet(); + jvmsFinishing.incrementAndGet(); + + if (!checkCompleted()) { + throw new IllegalStateException("Should be completed"); + } + + // There is a pending exception that terminated the target VM. + if (pendingException != null) { + result = new TestResult(Status.VM_ERROR); + result.addMessage(pendingException.getMessage()); + result.setConfig(task); + sink.add(result); + return; + } + // Try to poll the exit code, and fail if it's not zero. try { int ecode = process.waitFor(); - jvmsRunning.decrementAndGet(); - - outCollector.join(); - errCollector.join(); - if (ecode != 0) { result = new TestResult(Status.VM_ERROR); result.addMessage("Failed with error code " + ecode); @@ -410,11 +462,11 @@ public synchronized boolean checkCompleted(TestResultCollector sink) { result = new TestResult(Status.VM_ERROR); result.addMessage("Harness error, no result generated"); } - result.addVMOuts(outCollector.getOutput()); - result.addVMErrs(errCollector.getOutput()); + result.addVMOuts(outs.get()); + result.addVMErrs(errs.get()); result.setConfig(task); sink.add(result); - } catch (InterruptedException ex) { + } catch (InterruptedException | ExecutionException ex) { result = new TestResult(Status.VM_ERROR); result.addMessage(ex.getMessage()); result.setConfig(task); @@ -425,7 +477,8 @@ public synchronized boolean checkCompleted(TestResultCollector sink) { compilerDirectives.delete(); } } - return true; + + jvmsFinishing.decrementAndGet(); } public synchronized void recordResult(TestResult r) { diff --git a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/grading/ConsoleReportPrinter.java b/jcstress-core/src/main/java/org/openjdk/jcstress/infra/grading/ConsoleReportPrinter.java index 1785dd1c..b756d08c 100644 --- a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/grading/ConsoleReportPrinter.java +++ b/jcstress-core/src/main/java/org/openjdk/jcstress/infra/grading/ConsoleReportPrinter.java @@ -147,10 +147,10 @@ private void printStatusLine() { long currentTime = System.nanoTime(); final int actorCpus = executor.getActorCpus(); final int systemCpus = executor.getSystemCpus(); - String line = String.format("(ETA: %10s) (Sample Rate: %s) (JVMs: %d running) (CPUs: %d actor, %d system, %d total) (Results: %d planned; %d passed, %d failed, %d soft errs, %d hard errs)", + String line = String.format("(ETA: %10s) (Sample Rate: %s) (JVMs: %d start, %d run, %d finish) (CPUs: %d actor, %d system, %d total) (Results: %d planned; %d passed, %d failed, %d soft errs, %d hard errs)", computeETA(), computeSpeed(), - executor.getJVMsRunning(), + executor.getJVMsStarting(), executor.getJVMsRunning(), executor.getJVMsFinishing(), actorCpus, systemCpus, actorCpus + systemCpus, expectedResults, passed, failed, softErrors, hardErrors ); diff --git a/jcstress-core/src/main/java/org/openjdk/jcstress/util/InputStreamCollector.java b/jcstress-core/src/main/java/org/openjdk/jcstress/util/InputStreamCollector.java index 3056aca8..7809210f 100644 --- a/jcstress-core/src/main/java/org/openjdk/jcstress/util/InputStreamCollector.java +++ b/jcstress-core/src/main/java/org/openjdk/jcstress/util/InputStreamCollector.java @@ -25,13 +25,11 @@ package org.openjdk.jcstress.util; import java.io.*; -import java.nio.Buffer; import java.util.ArrayList; import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.concurrent.Callable; -public class InputStreamCollector extends Thread { +public class InputStreamCollector implements Callable<List<String>> { private final InputStream in; private final List<String> list; @@ -41,7 +39,7 @@ public InputStreamCollector(InputStream in) { this.list = new ArrayList<>(); } - public void run() { + public List<String> call() { try (InputStreamReader isr = new InputStreamReader(in); BufferedReader br = new BufferedReader(isr)) { String line; @@ -51,9 +49,6 @@ public void run() { } catch (IOException e) { // Do nothing. } - } - - public List<String> getOutput() { return list; }