Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support yield on virtual thread on EPollSelector select() #166

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
Original file line number Diff line number Diff line change
@@ -116,11 +116,18 @@ protected int doSelect(Consumer<SelectionKey> action, long timeout)

do {
long startTime = timedPoll ? System.nanoTime() : 0;
long comp = Blocker.begin(blocking);
try {
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
} finally {
Blocker.end(comp);
if (Poller.useRecursivePoll() && Thread.currentThread().isVirtual()) {
if (timeout != 0) {
Poller.poll(epfd, Net.POLLIN, TimeUnit.MILLISECONDS.toNanos(timeout), this::isOpen);
}
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, 0);
} else {
long comp = Blocker.begin();
try {
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
} finally {
Blocker.end(comp);
}
}
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
11 changes: 11 additions & 0 deletions src/java.base/share/classes/sun/nio/ch/Poller.java
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ public abstract class Poller {
private static final Poller[] WRITE_POLLERS;
private static final int READ_MASK, WRITE_MASK;
private static final boolean USE_DIRECT_REGISTER;
private static final boolean USE_RECURSIVE_POLL;

// true if this is a poller for reading, false for writing
private final boolean read;
@@ -388,6 +389,12 @@ int fdVal() {
} else {
USE_DIRECT_REGISTER = "".equals(s) || Boolean.parseBoolean(s);
}
s = GetPropertyAction.privilegedGetProperty("jdk.useRecursivePoll");
if (s == null) {
USE_RECURSIVE_POLL = provider.useRecursivePoll();
} else {
USE_RECURSIVE_POLL = "".equals(s) || Boolean.parseBoolean(s);
}
try {
Poller[] readPollers = createReadPollers(provider);
READ_POLLERS = readPollers;
@@ -466,4 +473,8 @@ public static Stream<Thread> blockedThreads() {
private Stream<Thread> registeredThreads() {
return map.values().stream();
}

static boolean useRecursivePoll() {
return USE_RECURSIVE_POLL;
}
}
4 changes: 4 additions & 0 deletions src/java.base/share/classes/sun/nio/ch/PollerProvider.java
Original file line number Diff line number Diff line change
@@ -41,6 +41,10 @@ boolean useDirectRegister() {
return false;
}

boolean useRecursivePoll() {
return false;
}

/**
* Creates a Poller for read ops.
*/
25 changes: 19 additions & 6 deletions src/java.base/share/classes/sun/nio/ch/SelectorImpl.java
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;


@@ -67,6 +68,9 @@ public abstract class SelectorImpl
// used to check for reentrancy
private boolean inSelect;

private final ReentrantLock selectorLock = new ReentrantLock();
private final ReentrantLock publicSelectedKeysLock = new ReentrantLock();

protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
@@ -119,18 +123,21 @@ protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
selectorLock.lock();
try {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
publicSelectedKeysLock.lock();
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
return doSelect(action, timeout);
} finally {
publicSelectedKeysLock.unlock();
inSelect = false;
}
} finally {
selectorLock.unlock();
}
}

@@ -181,9 +188,11 @@ public final int selectNow(Consumer<SelectionKey> action) throws IOException {
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
selectorLock.lock();
try {
implClose();
synchronized (publicSelectedKeys) {
publicSelectedKeysLock.lock();
try {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
@@ -196,7 +205,11 @@ public final void implCloseSelector() throws IOException {
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
} finally {
publicSelectedKeysLock.unlock();
}
} finally {
selectorLock.unlock();
}
}

158 changes: 158 additions & 0 deletions test/jdk/java/lang/Thread/virtual/Selectors.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

/**
* @test
* @summary Basic tests for virtual threads doing blocking Selector select
* @requires (os.family == "linux")
* @compile --enable-preview --add-exports=java.base/sun.nio.ch=ALL-UNNAMED -source ${jdk.version} Selectors.java
* @run testng/othervm/timeout=300 --enable-preview --add-exports=java.base/sun.nio.ch=ALL-UNNAMED Selectors
* @run testng/othervm/timeout=300 --enable-preview --add-exports=java.base/sun.nio.ch=ALL-UNNAMED -Djdk.useRecursivePoll=true Selectors
* @run testng/othervm/timeout=300 --enable-preview --add-exports=java.base/sun.nio.ch=ALL-UNNAMED -Djdk.useRecursivePoll=true -Djdk.useDirectRegister Selectors
*/

import org.testng.annotations.Test;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import sun.nio.ch.Poller;
import static org.testng.Assert.*;


public class Selectors {
@Test
public void testSelectorMounted() throws Exception {
var selectorThread = Thread.ofVirtual().start(() -> {
try {
Selector selector = Selector.open();
selector.select();
} catch (Exception ignored) {
}
});
Thread.sleep(200);
assertEquals(selectorThread.getState(),
(Boolean.parseBoolean(System.getProperty("jdk.useRecursivePoll"))? Thread.State.WAITING : Thread.State.RUNNABLE));
selectorThread.interrupt();
selectorThread.join();
}

@Test
public void testSelectorWakeup() throws Exception {
var selectorSet = new CountDownLatch(1);
var wakened = new CountDownLatch(1);
var selector = new AtomicReference<Selector>();
var selectorThread = Thread.ofVirtual().start(() -> {
try {
selector.set(Selector.open());
selectorSet.countDown();
selector.get().select();
wakened.countDown();
} catch (Exception ignored) {
}
});
selectorSet.await();
selector.get().wakeup();
wakened.await();
selectorThread.join();
}

@Test
public void testSelectorInterrupt() throws Exception {
var wakened = new CountDownLatch(1);
var selector = new AtomicReference<Selector>();
var exception = new AtomicReference<Exception>();
var selectorThread = Thread.ofVirtual().start(() -> {
try {
selector.set(Selector.open());
selector.get().select();
assertTrue(Thread.currentThread().isInterrupted());
wakened.countDown();
} catch (Exception e) {
exception.set(e);
}
});
Thread.sleep(100); // give time for thread to block
selectorThread.interrupt();
wakened.await();
assertTrue(exception.get() == null);
selectorThread.join();
}

@Test
public void testSelectNow() throws Exception {
var selector = Selector.open();
var p = SelectorProvider.provider().openPipe();
var sink = p.sink();
var source = p.source();
source.configureBlocking(false);
sink.configureBlocking(false);
var selectResult = new AtomicReference<Integer>();
var exception = new AtomicReference<Exception>();

// selectNow return expected result
Thread.ofVirtual().start(() -> {
try {
selectResult.set(selector.selectNow());
} catch (Exception e) {
exception.set(e);
}
}).join();

assertTrue(exception.get() == null);
assertTrue(selectResult.get() == 0);

var readKey = source.register(selector, SelectionKey.OP_READ);
var writeBuffer = ByteBuffer.allocateDirect(128);
writeBuffer.put("helloworld".getBytes());
sink.write(writeBuffer);

Thread.ofVirtual().start(() -> {
try {
selectResult.set(selector.selectNow());
} catch (Exception e) {
exception.set(e);
}
}).join();

assertTrue(exception.get() == null);
assertTrue(selectResult.get() == 1);
}

@Test
public void testSelectWithTimeout() throws Exception {
// timed select wakeup eventually
var exception = new AtomicReference<Exception>();
Thread.ofVirtual().start(() -> {
try {
Selector.open().select(1000);
} catch (Exception e) {
exception.set(e);
}
}).join();
}

}
6 changes: 6 additions & 0 deletions test/jdk/java/net/vthread/HttpALot.java
Original file line number Diff line number Diff line change
@@ -31,6 +31,12 @@
* -Dsun.net.client.defaultConnectTimeout=5000
* -Dsun.net.client.defaultReadTimeout=5000
* HttpALot
* @run main/othervm/timeout=600
* --enable-preview
* -Dsun.net.client.defaultConnectTimeout=5000
* -Dsun.net.client.defaultReadTimeout=5000
* -Djdk.useRecursivePoll=true
* HttpALot
*/

import java.io.IOException;
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
/* @test
* @bug 8201315
* @build SelectorUtils
* @run main RegisterDuringSelect
* @run main/othervm --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED RegisterDuringSelect
* @summary Test that channels can be registered, interest ops can changed,
* and keys cancelled while a selection operation is in progress.
*/
2 changes: 1 addition & 1 deletion test/jdk/java/nio/channels/Selector/SelectAndClose.java
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
/* @test
* @bug 5004077 8203765
* @build SelectorUtils
* @run main SelectAndClose
* @run main/othervm --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED SelectAndClose
* @summary Check blocking of select and close
*/

17 changes: 7 additions & 10 deletions test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
/* @test
* @summary Unit test for Selector.select/selectNow(Consumer)
* @bug 8199433 8208780
* @run testng SelectWithConsumer
* @run testng/othervm --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED SelectWithConsumer
*/

/* @test
@@ -518,25 +518,22 @@ public void testLocks() throws Exception {
// select(Consumer)
sel.select(k -> {
assertTrue(k == key);
assertTrue(Thread.holdsLock(sel));
assertFalse(Thread.holdsLock(sel.keys()));
assertTrue(Thread.holdsLock(sel.selectedKeys()));
assertTrue(SelectorUtils.mightHoldKeysLock(Thread.currentThread(), sel));
assertTrue(SelectorUtils.mightHoldSelectorLock(Thread.currentThread(), sel));
});

// select(Consumer, timeout)
sel.select(k -> {
assertTrue(k == key);
assertTrue(Thread.holdsLock(sel));
assertFalse(Thread.holdsLock(sel.keys()));
assertTrue(Thread.holdsLock(sel.selectedKeys()));
assertTrue(SelectorUtils.mightHoldKeysLock(Thread.currentThread(), sel));
assertTrue(SelectorUtils.mightHoldSelectorLock(Thread.currentThread(), sel));
}, 1000L);

// selectNow(Consumer)
sel.selectNow(k -> {
assertTrue(k == key);
assertTrue(Thread.holdsLock(sel));
assertFalse(Thread.holdsLock(sel.keys()));
assertTrue(Thread.holdsLock(sel.selectedKeys()));
assertTrue(SelectorUtils.mightHoldKeysLock(Thread.currentThread(), sel));
assertTrue(SelectorUtils.mightHoldSelectorLock(Thread.currentThread(), sel));
});
} finally {
closePipe(p);
59 changes: 41 additions & 18 deletions test/jdk/java/nio/channels/Selector/SelectorUtils.java
Original file line number Diff line number Diff line change
@@ -24,30 +24,53 @@
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.channels.Selector;
import java.util.concurrent.locks.ReentrantLock;

public class SelectorUtils {

/**
* tell if the monitor of an Object is held by a Thread.
* @param t the Thread to hold the monitor of the selected-key set
* @param lock the Object
* @return
*/
public static boolean mightHoldLock(Thread t, Object lock) {
long tid = t.getId();
int hash = System.identityHashCode(lock);
ThreadInfo ti = ManagementFactory.getThreadMXBean().
getThreadInfo(new long[]{ tid} , true, false, 100)[0];
if (ti != null) {
for (MonitorInfo mi : ti.getLockedMonitors()) {
if (mi.getIdentityHashCode() == hash)
return true;
}
private static Field SELECTOR_LOCK;
private static Field SELECTOR_SELECTEDKEY_LOCK;
private static Method OWNER;

static {
try {
SELECTOR_LOCK = Class.forName("sun.nio.ch.SelectorImpl").getDeclaredField("selectorLock");
SELECTOR_LOCK.setAccessible(true);
SELECTOR_SELECTEDKEY_LOCK = Class.forName("sun.nio.ch.SelectorImpl").getDeclaredField("publicSelectedKeys");
SELECTOR_SELECTEDKEY_LOCK.setAccessible(true);
OWNER = ReentrantLock.class.getDeclaredMethod("getOwner");
OWNER.setAccessible(true);
} catch (Exception e) {
throw new InternalError(e);
}
}

public static boolean mightHoldSelectorLock(Thread t, Object selector) {
try {
ReentrantLock lock = (ReentrantLock) SELECTOR_LOCK.get(selector);
if (lock == null)
return false;
return OWNER.invoke(lock) == t;
} catch (Exception e) {
return false;
}
return false;
}

public static boolean mightHoldKeysLock(Thread t, Object selector) {
try {
ReentrantLock lock = (ReentrantLock) SELECTOR_LOCK.get(selector);
if (lock == null)
return false;
return OWNER.invoke(lock) == t;
} catch (Exception e) {
return false;
}
}


/**
* Spin until the monitor of the selected-key set is likely held
* as selected operations are specified to synchronize on the
@@ -57,7 +80,7 @@ public static boolean mightHoldLock(Thread t, Object lock) {
* @throws Exception
*/
public static void spinUntilLocked(Thread t, Selector sel) throws Exception {
while (!mightHoldLock(t, sel.selectedKeys())) {
while (!mightHoldSelectorLock(t, sel)) {
Thread.sleep(50);
}
}