Skip to content

Commit 2dbb936

Browse files
author
Alan Bateman
committedJan 6, 2022
8279339: (ch) Input/Output streams returned by Channels factory methods don't support concurrent read/write ops
Reviewed-by: lancea, bpb
1 parent 456bd1e commit 2dbb936

File tree

8 files changed

+791
-156
lines changed

8 files changed

+791
-156
lines changed
 

‎src/java.base/share/classes/java/nio/channels/Channels.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -40,8 +40,6 @@
4040
import java.nio.channels.spi.AbstractInterruptibleChannel;
4141
import java.util.Objects;
4242
import java.util.concurrent.ExecutionException;
43-
import sun.nio.ch.ChannelInputStream;
44-
import sun.nio.ch.ChannelOutputStream;
4543
import sun.nio.cs.StreamDecoder;
4644
import sun.nio.cs.StreamEncoder;
4745

@@ -87,7 +85,7 @@ public final class Channels {
8785
*/
8886
public static InputStream newInputStream(ReadableByteChannel ch) {
8987
Objects.requireNonNull(ch, "ch");
90-
return new ChannelInputStream(ch);
88+
return sun.nio.ch.Streams.of(ch);
9189
}
9290

9391
/**
@@ -106,7 +104,7 @@ public static InputStream newInputStream(ReadableByteChannel ch) {
106104
*/
107105
public static OutputStream newOutputStream(WritableByteChannel ch) {
108106
Objects.requireNonNull(ch, "ch");
109-
return new ChannelOutputStream(ch);
107+
return sun.nio.ch.Streams.of(ch);
110108
}
111109

112110
/**

‎src/java.base/share/classes/sun/nio/ch/ChannelInputStream.java

+62-56
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2001, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -40,58 +40,52 @@
4040
import jdk.internal.util.ArraysSupport;
4141

4242
/**
43-
* This class is defined here rather than in java.nio.channels.Channels
44-
* so that code can be shared with SocketAdaptor.
43+
* An InputStream that reads bytes from a channel.
4544
*
4645
* @author Mike McCloskey
4746
* @author Mark Reinhold
48-
* @since 1.4
4947
*/
50-
51-
public class ChannelInputStream
52-
extends InputStream
53-
{
48+
class ChannelInputStream extends InputStream {
5449
private static final int DEFAULT_BUFFER_SIZE = 8192;
5550

56-
public static int read(ReadableByteChannel ch, ByteBuffer bb,
57-
boolean block)
58-
throws IOException
59-
{
51+
private final ReadableByteChannel ch;
52+
private ByteBuffer bb;
53+
private byte[] bs; // Invoker's previous array
54+
private byte[] b1;
55+
56+
/**
57+
* Initialize a ChannelInputStream that reads from the given channel.
58+
*/
59+
ChannelInputStream(ReadableByteChannel ch) {
60+
this.ch = ch;
61+
}
62+
63+
/**
64+
* Reads a sequence of bytes from the channel into the given buffer.
65+
*/
66+
private int read(ByteBuffer bb) throws IOException {
6067
if (ch instanceof SelectableChannel sc) {
6168
synchronized (sc.blockingLock()) {
62-
boolean bm = sc.isBlocking();
63-
if (!bm)
69+
if (!sc.isBlocking())
6470
throw new IllegalBlockingModeException();
65-
if (bm != block)
66-
sc.configureBlocking(block);
67-
int n = ch.read(bb);
68-
if (bm != block)
69-
sc.configureBlocking(bm);
70-
return n;
71+
return ch.read(bb);
7172
}
7273
} else {
7374
return ch.read(bb);
7475
}
7576
}
7677

77-
protected final ReadableByteChannel ch;
78-
private ByteBuffer bb = null;
79-
private byte[] bs = null; // Invoker's previous array
80-
private byte[] b1 = null;
81-
82-
public ChannelInputStream(ReadableByteChannel ch) {
83-
this.ch = ch;
84-
}
85-
78+
@Override
8679
public synchronized int read() throws IOException {
8780
if (b1 == null)
8881
b1 = new byte[1];
89-
int n = this.read(b1);
82+
int n = read(b1);
9083
if (n == 1)
9184
return b1[0] & 0xff;
9285
return -1;
9386
}
9487

88+
@Override
9589
public synchronized int read(byte[] bs, int off, int len)
9690
throws IOException
9791
{
@@ -109,12 +103,6 @@ public synchronized int read(byte[] bs, int off, int len)
109103
return read(bb);
110104
}
111105

112-
protected int read(ByteBuffer bb)
113-
throws IOException
114-
{
115-
return ChannelInputStream.read(ch, bb, true);
116-
}
117-
118106
@Override
119107
public byte[] readAllBytes() throws IOException {
120108
if (!(ch instanceof SeekableByteChannel sbc))
@@ -201,6 +189,7 @@ public byte[] readNBytes(int len) throws IOException {
201189
return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
202190
}
203191

192+
@Override
204193
public int available() throws IOException {
205194
// special case where the channel is to a file
206195
if (ch instanceof SeekableByteChannel sbc) {
@@ -210,6 +199,7 @@ public int available() throws IOException {
210199
return 0;
211200
}
212201

202+
@Override
213203
public synchronized long skip(long n) throws IOException {
214204
// special case where the channel is to a file
215205
if (ch instanceof SeekableByteChannel sbc) {
@@ -230,46 +220,62 @@ public synchronized long skip(long n) throws IOException {
230220
return super.skip(n);
231221
}
232222

233-
public void close() throws IOException {
234-
ch.close();
235-
}
236-
237223
@Override
238224
public long transferTo(OutputStream out) throws IOException {
239225
Objects.requireNonNull(out, "out");
240226

241-
if (out instanceof ChannelOutputStream cos
242-
&& ch instanceof FileChannel fc) {
243-
WritableByteChannel wbc = cos.channel();
244-
245-
if (wbc instanceof FileChannel dst) {
246-
return transfer(fc, dst);
247-
}
248-
249-
if (wbc instanceof SelectableChannel sc) {
227+
if (ch instanceof FileChannel fc) {
228+
// FileChannel -> SocketChannel
229+
if (out instanceof SocketOutputStream sos) {
230+
SocketChannelImpl sc = sos.channel();
250231
synchronized (sc.blockingLock()) {
251232
if (!sc.isBlocking())
252233
throw new IllegalBlockingModeException();
253-
return transfer(fc, wbc);
234+
return transfer(fc, sc);
254235
}
255236
}
256237

257-
return transfer(fc, wbc);
238+
// FileChannel -> WritableByteChannel
239+
if (out instanceof ChannelOutputStream cos) {
240+
WritableByteChannel wbc = cos.channel();
241+
242+
if (wbc instanceof SelectableChannel sc) {
243+
synchronized (sc.blockingLock()) {
244+
if (!sc.isBlocking())
245+
throw new IllegalBlockingModeException();
246+
return transfer(fc, wbc);
247+
}
248+
}
249+
250+
return transfer(fc, wbc);
251+
}
258252
}
259253

260254
return super.transferTo(out);
261255
}
262256

263-
private static long transfer(FileChannel src, WritableByteChannel dst) throws IOException {
264-
long initialPos = src.position();
257+
/**
258+
* Transfers all bytes from a channel's file to a target writeable byte channel.
259+
* If the writeable byte channel is a selectable channel then it must be in
260+
* blocking mode.
261+
*/
262+
private static long transfer(FileChannel fc, WritableByteChannel target)
263+
throws IOException
264+
{
265+
long initialPos = fc.position();
265266
long pos = initialPos;
266267
try {
267-
while (pos < src.size()) {
268-
pos += src.transferTo(pos, Long.MAX_VALUE, dst);
268+
while (pos < fc.size()) {
269+
pos += fc.transferTo(pos, Long.MAX_VALUE, target);
269270
}
270271
} finally {
271-
src.position(pos);
272+
fc.position(pos);
272273
}
273274
return pos - initialPos;
274275
}
276+
277+
@Override
278+
public void close() throws IOException {
279+
ch.close();
280+
}
275281
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -25,68 +25,30 @@
2525

2626
package sun.nio.ch;
2727

28-
import java.io.*;
29-
import java.nio.*;
30-
import java.nio.channels.*;
31-
import java.nio.channels.spi.*;
28+
import java.io.IOException;
29+
import java.io.OutputStream;
30+
import java.nio.ByteBuffer;
31+
import java.nio.channels.IllegalBlockingModeException;
32+
import java.nio.channels.SelectableChannel;
33+
import java.nio.channels.WritableByteChannel;
3234
import java.util.Objects;
3335

3436
/**
35-
* This class is defined here rather than in java.nio.channels.Channels
36-
* so that it will be accessible from java.nio.channels.Channels and
37-
* sun.nio.ch.ChannelInputStream.
38-
*
37+
* An OutputStream that writes bytes to a channel.
3938
*
4039
* @author Mark Reinhold
4140
* @author Mike McCloskey
42-
* @author JSR-51 Expert Group
43-
* @since 18
4441
*/
45-
public class ChannelOutputStream extends OutputStream {
46-
42+
class ChannelOutputStream extends OutputStream {
4743
private final WritableByteChannel ch;
4844
private ByteBuffer bb;
4945
private byte[] bs; // Invoker's previous array
5046
private byte[] b1;
5147

5248
/**
53-
* Write all remaining bytes in buffer to the given channel.
54-
* If the channel is selectable then it must be configured blocking.
55-
*/
56-
private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
57-
throws IOException
58-
{
59-
while (bb.remaining() > 0) {
60-
int n = ch.write(bb);
61-
if (n <= 0)
62-
throw new RuntimeException("no bytes written");
63-
}
64-
}
65-
66-
/**
67-
* Write all remaining bytes in buffer to the given channel.
68-
*
69-
* @throws IllegalBlockingModeException
70-
* If the channel is selectable and configured non-blocking.
71-
*/
72-
private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
73-
throws IOException
74-
{
75-
if (ch instanceof SelectableChannel sc) {
76-
synchronized (sc.blockingLock()) {
77-
if (!sc.isBlocking())
78-
throw new IllegalBlockingModeException();
79-
writeFullyImpl(ch, bb);
80-
}
81-
} else {
82-
writeFullyImpl(ch, bb);
83-
}
84-
}
85-
86-
/**
87-
* @param ch The channel wrapped by this stream.
49+
* Initialize a ChannelOutputStream that writes to the given channel.
8850
*/
89-
public ChannelOutputStream(WritableByteChannel ch) {
51+
ChannelOutputStream(WritableByteChannel ch) {
9052
this.ch = ch;
9153
}
9254

@@ -97,17 +59,30 @@ WritableByteChannel channel() {
9759
return ch;
9860
}
9961

62+
/**
63+
* Write all remaining bytes in buffer to the channel.
64+
* If the channel is selectable then it must be configured blocking.
65+
*/
66+
private void writeFully(ByteBuffer bb) throws IOException {
67+
while (bb.remaining() > 0) {
68+
int n = ch.write(bb);
69+
if (n <= 0)
70+
throw new RuntimeException("no bytes written");
71+
}
72+
}
73+
10074
@Override
10175
public synchronized void write(int b) throws IOException {
10276
if (b1 == null)
10377
b1 = new byte[1];
10478
b1[0] = (byte) b;
105-
this.write(b1);
79+
write(b1);
10680
}
10781

10882
@Override
10983
public synchronized void write(byte[] bs, int off, int len)
110-
throws IOException {
84+
throws IOException
85+
{
11186
Objects.checkFromIndexSize(off, len, bs.length);
11287
if (len == 0) {
11388
return;
@@ -119,12 +94,20 @@ public synchronized void write(byte[] bs, int off, int len)
11994
bb.position(off);
12095
this.bb = bb;
12196
this.bs = bs;
122-
writeFully(ch, bb);
97+
98+
if (ch instanceof SelectableChannel sc) {
99+
synchronized (sc.blockingLock()) {
100+
if (!sc.isBlocking())
101+
throw new IllegalBlockingModeException();
102+
writeFully(bb);
103+
}
104+
} else {
105+
writeFully(bb);
106+
}
123107
}
124108

125109
@Override
126110
public void close() throws IOException {
127111
ch.close();
128112
}
129-
130113
}

‎src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java

+3-42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -177,32 +177,7 @@ public InputStream getInputStream() throws IOException {
177177
throw new SocketException("Socket is not connected");
178178
if (!sc.isInputOpen())
179179
throw new SocketException("Socket input is shutdown");
180-
return new InputStream() {
181-
@Override
182-
public int read() throws IOException {
183-
byte[] a = new byte[1];
184-
int n = read(a, 0, 1);
185-
return (n > 0) ? (a[0] & 0xff) : -1;
186-
}
187-
@Override
188-
public int read(byte[] b, int off, int len) throws IOException {
189-
int timeout = SocketAdaptor.this.timeout;
190-
if (timeout > 0) {
191-
long nanos = MILLISECONDS.toNanos(timeout);
192-
return sc.blockingRead(b, off, len, nanos);
193-
} else {
194-
return sc.blockingRead(b, off, len, 0);
195-
}
196-
}
197-
@Override
198-
public int available() throws IOException {
199-
return sc.available();
200-
}
201-
@Override
202-
public void close() throws IOException {
203-
sc.close();
204-
}
205-
};
180+
return new SocketInputStream(sc, () -> timeout);
206181
}
207182

208183
@Override
@@ -213,21 +188,7 @@ public OutputStream getOutputStream() throws IOException {
213188
throw new SocketException("Socket is not connected");
214189
if (!sc.isOutputOpen())
215190
throw new SocketException("Socket output is shutdown");
216-
return new OutputStream() {
217-
@Override
218-
public void write(int b) throws IOException {
219-
byte[] a = new byte[]{(byte) b};
220-
write(a, 0, 1);
221-
}
222-
@Override
223-
public void write(byte[] b, int off, int len) throws IOException {
224-
sc.blockingWriteFully(b, off, len);
225-
}
226-
@Override
227-
public void close() throws IOException {
228-
sc.close();
229-
}
230-
};
191+
return new SocketOutputStream(sc);
231192
}
232193

233194
private void setBooleanOption(SocketOption<Boolean> name, boolean value)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
package sun.nio.ch;
26+
27+
import java.io.IOException;
28+
import java.io.InputStream;
29+
import java.util.function.IntSupplier;
30+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
31+
32+
/**
33+
* An InputStream that reads bytes from a socket channel.
34+
*/
35+
class SocketInputStream extends InputStream {
36+
private final SocketChannelImpl sc;
37+
private final IntSupplier timeoutSupplier;
38+
39+
/**
40+
* Initialize a SocketInputStream that reads from the given socket channel.
41+
* @param sc the socket channel
42+
* @param timeoutSupplier supplies the read timeout, in milliseconds
43+
*/
44+
SocketInputStream(SocketChannelImpl sc, IntSupplier timeoutSupplier) {
45+
this.sc = sc;
46+
this.timeoutSupplier = timeoutSupplier;
47+
}
48+
49+
/**
50+
* Initialize a SocketInputStream that reads from the given socket channel.
51+
*/
52+
SocketInputStream(SocketChannelImpl sc) {
53+
this(sc, () -> 0);
54+
}
55+
56+
@Override
57+
public int read() throws IOException {
58+
byte[] a = new byte[1];
59+
int n = read(a, 0, 1);
60+
return (n > 0) ? (a[0] & 0xff) : -1;
61+
}
62+
63+
@Override
64+
public int read(byte[] b, int off, int len) throws IOException {
65+
int timeout = timeoutSupplier.getAsInt();
66+
if (timeout > 0) {
67+
long nanos = MILLISECONDS.toNanos(timeout);
68+
return sc.blockingRead(b, off, len, nanos);
69+
} else {
70+
return sc.blockingRead(b, off, len, 0);
71+
}
72+
}
73+
74+
@Override
75+
public int available() throws IOException {
76+
return sc.available();
77+
}
78+
79+
@Override
80+
public void close() throws IOException {
81+
sc.close();
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
package sun.nio.ch;
26+
27+
import java.io.IOException;
28+
import java.io.OutputStream;
29+
30+
/**
31+
* An OutputStream that writes bytes to a socket channel.
32+
*/
33+
class SocketOutputStream extends OutputStream {
34+
private final SocketChannelImpl sc;
35+
36+
/**
37+
* Initialize a SocketOutputStream that writes to the given socket channel.
38+
*/
39+
SocketOutputStream(SocketChannelImpl sc) {
40+
this.sc = sc;
41+
}
42+
43+
/**
44+
* Returns the socket channel.
45+
*/
46+
SocketChannelImpl channel() {
47+
return sc;
48+
}
49+
50+
@Override
51+
public void write(int b) throws IOException {
52+
byte[] a = new byte[]{(byte) b};
53+
write(a, 0, 1);
54+
}
55+
56+
@Override
57+
public void write(byte[] b, int off, int len) throws IOException {
58+
sc.blockingWriteFully(b, off, len);
59+
}
60+
61+
@Override
62+
public void close() throws IOException {
63+
sc.close();
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
package sun.nio.ch;
26+
27+
import java.io.InputStream;
28+
import java.io.OutputStream;
29+
import java.nio.channels.ReadableByteChannel;
30+
import java.nio.channels.WritableByteChannel;
31+
32+
/**
33+
* Factory methods for input/output streams based on channels.
34+
*/
35+
public class Streams {
36+
private Streams() { }
37+
38+
/**
39+
* Return an input stream that reads bytes from the given channel.
40+
*/
41+
public static InputStream of(ReadableByteChannel ch) {
42+
if (ch instanceof SocketChannelImpl sc) {
43+
return new SocketInputStream(sc);
44+
} else {
45+
return new ChannelInputStream(ch);
46+
}
47+
}
48+
49+
/**
50+
* Return an output stream that writes bytes to the given channel.
51+
*/
52+
public static OutputStream of(WritableByteChannel ch) {
53+
if (ch instanceof SocketChannelImpl sc) {
54+
return new SocketOutputStream(sc);
55+
} else {
56+
return new ChannelOutputStream(ch);
57+
}
58+
}
59+
}

‎test/jdk/java/nio/channels/Channels/SocketChannelStreams.java

+480
Large diffs are not rendered by default.

1 commit comments

Comments
 (1)

openjdk-notifier[bot] commented on Jan 6, 2022

@openjdk-notifier[bot]
Please sign in to comment.