update to latest quic native, add quic tests

This commit is contained in:
Jörg Prante 2024-02-04 22:47:03 +01:00
parent e293f2e603
commit 938371e9eb
46 changed files with 5231 additions and 19 deletions

View file

@ -42,12 +42,12 @@ Subproject organization
Original netty subproject names are not related to package names. I reorganized the names to allow better assignment
between subproject name, package name, artifact names, and java module. The following reorgnizations were performed:
netty/all -> [todo]
netty/bom -> [todo]
netty/all ->
netty/bom ->
netty/buffer -> netty-buffer
netty/codec -> netty-handler-codec, netty-handler-codec-compression, netty-handler-codec-protobuf
netty/codec-dns -> [todo]
netty/codec-haproxy -> [todo]
netty/codec-dns -> netty-handler-codec-dns
netty/codec-haproxy ->
netty/codec-http -> netty-handler-codec-http, netty-handler-codec-rtsp, netty-handler-codec-spdy
netty/codec-http2 ->
netty/codec-memcache ->
@ -62,7 +62,7 @@ netty/handler -> netty-handler
netty/handler-proxy
netty/handler-ssl-ocsp
netty/resolver -> netty-resolver
netty/resolver-dns ->
netty/resolver-dns -> netty-resolver-dns
netty/resolver-dns-classes-macos -> [dropped]
netty/resolver-dns-native-macos -> [dropped]
netty/transport -> netty-channel

View file

@ -31,7 +31,7 @@ ext {
apply plugin: 'com.google.osdetector'
subprojects {
if (!it.name.endsWith('-native')) {
if (!it.name.endsWith('-native') && it.name != 'test-results') {
apply from: rootProject.file('gradle/repositories/maven.gradle')
apply from: rootProject.file('gradle/compile/java.gradle')
apply from: rootProject.file('gradle/test/junit5.gradle')

View file

@ -12,8 +12,8 @@ test {
useJUnitPlatform()
failFast = false
ignoreFailures = true
minHeapSize = "1g" // initial heap size
maxHeapSize = "2g" // maximum heap size
minHeapSize = "2g" // initial heap size
maxHeapSize = "4g" // maximum heap size
jvmArgs '--add-exports=java.base/jdk.internal=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-exports=java.base/sun.nio.ch=ALL-UNNAMED',

View file

@ -1,3 +1,4 @@
/* currently we do not build our C code natively, but we provide copies of the binaries in META-INF/native */
/* the static library is included in other native builds, so nothing is provided here */

View file

@ -0,0 +1,2 @@
Because we moved from "io.netty.incubator" to "io.netty", we use only a specially prepared linux x86-64 shared library

View file

@ -3,4 +3,6 @@ dependencies {
implementation project(':netty-channel-epoll')
implementation project(':netty-channel-unix')
runtimeOnly project(path: ':netty-handler-codec-quic-native', configuration: osdetector.classifier)
testImplementation testLibs.assertj
testImplementation project(':netty-handler-ssl-bouncycastle')
}

View file

@ -182,7 +182,7 @@ public final class QuicSslContextBuilder {
/**
* Enable / disable keylog. When enabled, TLS keys are logged to an internal logger named
* "io.netty.incubator.codec.quic.BoringSSLLogginKeylog" with DEBUG level, see
* "io.netty.codec.quic.BoringSSLLogginKeylog" with DEBUG level, see
* {@link BoringSSLKeylog} for detail, logging keys are following
* <a href="https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format">
* NSS Key Log Format</a>. This is intended for debugging use with tools like Wireshark.

View file

@ -0,0 +1,46 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.util.concurrent.ImmediateExecutor;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Timeout(10)
public abstract class AbstractQuicTest {
@BeforeAll
public static void ensureAvailability() {
Quic.ensureAvailability();
}
static Executor[] newSslTaskExecutors() {
return new Executor[] {
ImmediateExecutor.INSTANCE,
Executors.newSingleThreadExecutor()
};
}
static void shutdown(Executor executor) {
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlushStrategyTest {
@Test
public void testAfterNumBytes() {
FlushStrategy strategy = FlushStrategy.afterNumBytes(10);
assertFalse(strategy.shouldFlushNow(1, 10));
assertTrue(strategy.shouldFlushNow(1, 11));
}
@Test
public void testAfterNumPackets() {
FlushStrategy strategy = FlushStrategy.afterNumPackets(10);
assertFalse(strategy.shouldFlushNow(10, 10));
assertTrue(strategy.shouldFlushNow(11, 11));
}
}

View file

@ -0,0 +1,82 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class InsecureQuicTokenHandlerTest extends AbstractQuicTest {
@Test
public void testMaxTokenLength() {
assertEquals(InsecureQuicTokenHandler.MAX_TOKEN_LEN, InsecureQuicTokenHandler.INSTANCE.maxTokenLength());
}
@Test
public void testTokenProcessingIpv4() throws UnknownHostException {
testTokenProcessing(true);
}
@Test
public void testTokenProcessingIpv6() throws UnknownHostException {
testTokenProcessing(false);
}
private static void testTokenProcessing(boolean ipv4) throws UnknownHostException {
byte[] bytes = new byte[Quiche.QUICHE_MAX_CONN_ID_LEN];
ThreadLocalRandom.current().nextBytes(bytes);
ByteBuf dcid = Unpooled.wrappedBuffer(bytes);
ByteBuf out = Unpooled.buffer();
try {
final InetSocketAddress validAddress;
final InetSocketAddress invalidAddress;
if (ipv4) {
validAddress = new InetSocketAddress(
InetAddress.getByAddress(new byte[] { 10, 10, 10, 1}), 9999);
invalidAddress = new InetSocketAddress(
InetAddress.getByAddress(new byte[] { 10, 10, 10, 10}), 9999);
} else {
validAddress = new InetSocketAddress(InetAddress.getByAddress(
new byte[] { 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 1}), 9999);
invalidAddress = new InetSocketAddress(InetAddress.getByAddress(
new byte[] { 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}), 9999);
}
InsecureQuicTokenHandler.INSTANCE.writeToken(out, dcid, validAddress);
assertThat(out.readableBytes(), lessThanOrEqualTo(InsecureQuicTokenHandler.INSTANCE.maxTokenLength()));
assertNotEquals(-1, InsecureQuicTokenHandler.INSTANCE.validateToken(out, validAddress));
// Use another address and check that the validate fails.
assertEquals(-1, InsecureQuicTokenHandler.INSTANCE.validateToken(out, invalidAddress));
} finally {
dcid.release();
out.release();
}
}
}

View file

@ -0,0 +1,305 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuicChannelDatagramTest extends AbstractQuicTest {
private static final Random random = new Random();
static final byte[] data = new byte[512];
static {
random.nextBytes(data);
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramFlushInChannelRead(Executor executor) throws Throwable {
testDatagram(executor, false);
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramFlushInChannelReadComplete(Executor executor) throws Throwable {
testDatagram(executor, true);
}
private void testDatagram(Executor executor, boolean flushInReadComplete) throws Throwable {
AtomicReference<QuicDatagramExtensionEvent> serverEventRef = new AtomicReference<>();
QuicChannelValidationHandler serverHandler = new QuicChannelValidationHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
final ChannelFuture future;
if (!flushInReadComplete) {
future = ctx.writeAndFlush(msg);
} else {
future = ctx.write(msg);
}
future.addListener(ChannelFutureListener.CLOSE);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (flushInReadComplete) {
ctx.flush();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof QuicDatagramExtensionEvent) {
serverEventRef.set((QuicDatagramExtensionEvent) evt);
}
super.userEventTriggered(ctx, evt);
}
};
Channel server = QuicTestUtils.newServer(QuicTestUtils.newQuicServerBuilder(executor)
.datagram(10, 10),
InsecureQuicTokenHandler.INSTANCE, serverHandler , new ChannelInboundHandlerAdapter());
InetSocketAddress address = (InetSocketAddress) server.localAddress();
Promise<ByteBuf> receivedBuffer = ImmediateEventExecutor.INSTANCE.newPromise();
AtomicReference<QuicDatagramExtensionEvent> clientEventRef = new AtomicReference<>();
Channel channel = QuicTestUtils.newClient(QuicTestUtils.newQuicClientBuilder(executor)
.datagram(10, 10));
QuicChannelValidationHandler clientHandler = new QuicChannelValidationHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!receivedBuffer.trySuccess((ByteBuf) msg)) {
ReferenceCountUtil.release(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof QuicDatagramExtensionEvent) {
clientEventRef.set((QuicDatagramExtensionEvent) evt);
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
receivedBuffer.tryFailure(cause);
super.exceptionCaught(ctx, cause);
}
};
try {
QuicChannel quicChannel = QuicTestUtils.newQuicChannelBootstrap(channel)
.handler(clientHandler)
.remoteAddress(address)
.connect()
.get();
quicChannel.writeAndFlush(Unpooled.copiedBuffer(data)).sync();
ByteBuf buffer = receivedBuffer.get();
ByteBuf expected = Unpooled.wrappedBuffer(data);
assertEquals(expected, buffer);
buffer.release();
expected.release();
assertNotEquals(0, serverEventRef.get().maxLength());
assertNotEquals(0, clientEventRef.get().maxLength());
quicChannel.close().sync();
serverHandler.assertState();
clientHandler.assertState();
} finally {
server.close().sync();
// Close the parent Datagram channel as well.
channel.close().sync();
shutdown(executor);
}
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramNoAutoReadMaxMessagesPerRead1(Executor executor) throws Throwable {
testDatagramNoAutoRead(executor, 1, false);
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramNoAutoReadMaxMessagesPerRead3(Executor executor) throws Throwable {
testDatagramNoAutoRead(executor, 3, false);
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramNoAutoReadMaxMessagesPerRead1OutSideEventLoop(Executor executor) throws Throwable {
testDatagramNoAutoRead(executor, 1, true);
}
@ParameterizedTest
@MethodSource("newSslTaskExecutors")
public void testDatagramNoAutoReadMaxMessagesPerRead3OutSideEventLoop(Executor executor) throws Throwable {
testDatagramNoAutoRead(executor, 3, true);
}
private void testDatagramNoAutoRead(Executor executor, int maxMessagesPerRead, boolean readLater) throws Throwable {
Promise<Void> serverPromise = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<ByteBuf> clientPromise = ImmediateEventExecutor.INSTANCE.newPromise();
int numDatagrams = 5;
AtomicInteger serverReadCount = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(numDatagrams);
QuicChannelValidationHandler serverHandler = new QuicChannelValidationHandler() {
private int readPerLoop;
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
readPerLoop++;
ctx.writeAndFlush(msg).addListener(future -> {
if (future.isSuccess()) {
latch.countDown();
}
});
if (serverReadCount.incrementAndGet() == numDatagrams) {
serverPromise.trySuccess(null);
}
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (readPerLoop > maxMessagesPerRead) {
ctx.close();
serverPromise.tryFailure(new AssertionError(
"Read more then " + maxMessagesPerRead + " time per read loop"));
return;
}
readPerLoop = 0;
if (serverReadCount.get() < numDatagrams) {
if (readLater) {
ctx.executor().execute(ctx::read);
} else {
ctx.read();
}
}
}
};
Channel server = QuicTestUtils.newServer(QuicTestUtils.newQuicServerBuilder(executor)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.MAX_MESSAGES_PER_READ, maxMessagesPerRead)
.datagram(10, 10),
InsecureQuicTokenHandler.INSTANCE, serverHandler, new ChannelInboundHandlerAdapter());
InetSocketAddress address = (InetSocketAddress) server.localAddress();
Channel channel = QuicTestUtils.newClient(QuicTestUtils.newQuicClientBuilder(executor)
.datagram(10, 10));
AtomicInteger clientReadCount = new AtomicInteger();
QuicChannelValidationHandler clientHandler = new QuicChannelValidationHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
if (clientReadCount.incrementAndGet() == numDatagrams) {
if (!clientPromise.trySuccess((ByteBuf) msg)) {
ReferenceCountUtil.release(msg);
}
} else {
ReferenceCountUtil.release(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
clientPromise.tryFailure(cause);
}
};
try {
QuicChannel quicChannel = QuicTestUtils.newQuicChannelBootstrap(channel)
.handler(clientHandler)
.remoteAddress(address)
.connect()
.get();
for (int i = 0; i < numDatagrams; i++) {
quicChannel.writeAndFlush(Unpooled.copiedBuffer(data)).sync();
// Let's add some sleep in between as this is UDP so we may loose some data otherwise.
Thread.sleep(50);
}
assertTrue(serverPromise.await(3000), "Server received: " + serverReadCount.get() +
", Client received: " + clientReadCount.get());
serverPromise.sync();
assertTrue(clientPromise.await(3000), "Server received: " + serverReadCount.get() +
", Client received: " + clientReadCount.get());
ByteBuf buffer = clientPromise.get();
ByteBuf expected = Unpooled.wrappedBuffer(data);
assertEquals(expected, buffer);
buffer.release();
expected.release();
quicChannel.close().sync();
serverHandler.assertState();
clientHandler.assertState();
} finally {
server.close().sync();
// Close the parent Datagram channel as well.
channel.close().sync();
shutdown(executor);
}
}
}

View file

@ -0,0 +1,439 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateExecutor;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuicChannelEchoTest extends AbstractQuicTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
static {
random.nextBytes(data);
}
public static Collection<Object[]> data() {
List<Object[]> config = new ArrayList<>();
for (int a = 0; a < 2; a++) {
for (int b = 0; b < 2; b++) {
for (int c = 0; c < 2; c++) {
config.add(new Object[] { a == 0, b == 0, c == 0 });
}
}
}
return config;
}
private void setAllocator(Channel channel, ByteBufAllocator allocator) {
channel.config().setAllocator(allocator);
}
private ByteBufAllocator getAllocator(boolean directBuffer) {
if (directBuffer) {
return new UnpooledByteBufAllocator(true);
} else {
// Force usage of heap buffers and also ensure memoryAddress() is not not supported.
return new AbstractByteBufAllocator(false) {
@Override
public ByteBuf ioBuffer() {
return heapBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return heapBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return heapBuffer(initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
@Override
public boolean isDirectBufferPooled() {
return false;
}
};
}
}
@ParameterizedTest(name =
"{index}: autoRead = {0}, directBuffer = {1}, composite = {2}")
@MethodSource("data")
public void testEchoStartedFromServer(boolean autoRead, boolean directBuffer, boolean composite) throws Throwable {
ByteBufAllocator allocator = getAllocator(directBuffer);
final EchoHandler sh = new EchoHandler(true, autoRead, allocator);
final EchoHandler ch = new EchoHandler(false, autoRead, allocator);
AtomicReference<List<ChannelFuture>> writeFutures = new AtomicReference<>();
Channel server = QuicTestUtils.newServer(ImmediateExecutor.INSTANCE, new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
setAllocator(ctx.channel(), allocator);
((QuicChannel) ctx.channel()).createStream(QuicStreamType.BIDIRECTIONAL, sh)
.addListener((Future<QuicStreamChannel> future) -> {
QuicStreamChannel stream = future.getNow();
setAllocator(stream, allocator);
List<ChannelFuture> futures = writeAllData(stream, composite, allocator);
writeFutures.set(futures);
});
ctx.channel().config().setAutoRead(autoRead);
if (!autoRead) {
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
}, sh);
setAllocator(server, allocator);
InetSocketAddress address = (InetSocketAddress) server.localAddress();
Channel channel = QuicTestUtils.newClient(ImmediateExecutor.INSTANCE);
QuicChannel quicChannel = null;
try {
quicChannel = QuicTestUtils.newQuicChannelBootstrap(channel)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
})
.streamHandler(ch)
// Use the same allocator for the streams.
.streamOption(ChannelOption.ALLOCATOR, allocator)
.remoteAddress(address)
.option(ChannelOption.AUTO_READ, autoRead)
.option(ChannelOption.ALLOCATOR, allocator)
.connect()
.get();
waitForData(ch, sh);
for (;;) {
List<ChannelFuture> futures = writeFutures.get();
if (futures != null) {
for (ChannelFuture f: futures) {
f.sync();
}
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
waitForData(sh, ch);
// Close underlying streams.
sh.channel.close().sync();
ch.channel.close().sync();
// Close underlying quic channels
sh.channel.parent().close().sync();
ch.channel.parent().close().sync();
checkForException(ch, sh);
} finally {
server.close().sync();
QuicTestUtils.closeIfNotNull(quicChannel);
// Close the parent Datagram channel as well.
channel.close().sync();
}
}
@ParameterizedTest(name =
"{index}: autoRead = {0}, directBuffer = {1}, composite = {2}")
@MethodSource("data")
public void testEchoStartedFromClient(boolean autoRead, boolean directBuffer, boolean composite) throws Throwable {
ByteBufAllocator allocator = getAllocator(directBuffer);
final EchoHandler sh = new EchoHandler(true, autoRead, allocator);
final EchoHandler ch = new EchoHandler(false, autoRead, allocator);
QuicChannelValidationHandler serverHandler = new QuicChannelValidationHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
setAllocator(ctx.channel(), allocator);
ctx.channel().config().setAutoRead(autoRead);
if (!autoRead) {
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
};
Channel server = QuicTestUtils.newServer(ImmediateExecutor.INSTANCE, serverHandler, sh);
setAllocator(server, allocator);
InetSocketAddress address = (InetSocketAddress) server.localAddress();
Channel channel = QuicTestUtils.newClient(ImmediateExecutor.INSTANCE);
QuicChannel quicChannel = null;
try {
QuicChannelValidationHandler clientHandler = new QuicChannelValidationHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (!autoRead) {
ctx.read();
}
}
};
quicChannel = QuicTestUtils.newQuicChannelBootstrap(channel)
.handler(clientHandler)
.streamHandler(ch)
// Use the same allocator for the streams.
.streamOption(ChannelOption.ALLOCATOR, allocator)
.remoteAddress(address)
.option(ChannelOption.AUTO_READ, autoRead)
.option(ChannelOption.ALLOCATOR, allocator)
.connect()
.get();
QuicStreamChannel stream = quicChannel.createStream(QuicStreamType.BIDIRECTIONAL, ch).sync().getNow();
setAllocator(stream, allocator);
assertEquals(QuicStreamType.BIDIRECTIONAL, stream.type());
assertEquals(0, stream.streamId());
assertTrue(stream.isLocalCreated());
for (int i = 0; i < 5; i++) {
ch.counter = 0;
sh.counter = 0;
List<ChannelFuture> futures = writeAllData(stream, composite, allocator);
for (ChannelFuture f : futures) {
f.sync();
}
waitForData(ch, sh);
waitForData(sh, ch);
Thread.sleep(100);
}
// Close underlying streams.
sh.channel.close().sync();
ch.channel.close().sync();
// Close underlying quic channels
sh.channel.parent().close().sync();
ch.channel.parent().close().sync();
checkForException(ch, sh);
serverHandler.assertState();
clientHandler.assertState();
} finally {
server.close().syncUninterruptibly();
QuicTestUtils.closeIfNotNull(quicChannel);
// Close the parent Datagram channel as well.
channel.close().sync();
}
}
private List<ChannelFuture> writeAllData(Channel channel, boolean composite, ByteBufAllocator allocator) {
if (composite) {
CompositeByteBuf compositeByteBuf = allocator.compositeBuffer();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = allocator.buffer().writeBytes(data, i, length);
compositeByteBuf.addComponent(true, buf);
i += length;
}
return Collections.singletonList(channel.writeAndFlush(compositeByteBuf));
} else {
List<ChannelFuture> futures = new ArrayList<>();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = allocator.buffer().writeBytes(data, i, length);
futures.add(channel.writeAndFlush(buf));
i += length;
}
return futures;
}
}
private static void waitForData(EchoHandler h1, EchoHandler h2) {
while (h1.counter < data.length) {
if (h2.exception.get() != null) {
break;
}
if (h1.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
}
private static void checkForException(EchoHandler h1, EchoHandler h2) throws Throwable {
if (h1.exception.get() != null && !(h1.exception.get() instanceof IOException)) {
throw h1.exception.get();
}
if (h2.exception.get() != null && !(h2.exception.get() instanceof IOException)) {
throw h2.exception.get();
}
if (h1.exception.get() != null) {
throw h1.exception.get();
}
if (h2.exception.get() != null) {
throw h2.exception.get();
}
}
private class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean server;
private final boolean autoRead;
private final ByteBufAllocator allocator;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<>();
volatile int counter;
EchoHandler(boolean server, boolean autoRead, ByteBufAllocator allocator) {
this.server = server;
this.autoRead = autoRead;
this.allocator = allocator;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ctx.channel().config().setAutoRead(autoRead);
setAllocator(ctx.channel(), allocator);
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
channel = ctx.channel();
QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
assertEquals(QuicStreamType.BIDIRECTIONAL, channel.type());
if (channel.isLocalCreated()) {
// Server starts with 1, client with 0
assertEquals(server ? 1 : 0, channel.streamId());
} else {
// Server starts with 1, client with 0
assertEquals(server ? 0 : 1, channel.streamId());
}
if (!autoRead) {
ctx.read();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(data[i + lastIdx], actual[i]);
}
if (!((QuicStreamChannel) ctx.channel()).isLocalCreated()) {
channel.write(Unpooled.wrappedBuffer(actual));
}
counter += actual.length;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
}

View file

@ -0,0 +1,36 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
class QuicChannelValidationHandler extends ChannelInboundHandlerAdapter {
private volatile Throwable cause;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
this.cause = cause;
}
void assertState() throws Throwable {
if (cause != null) {
throw cause;
}
}
}

View file

@ -0,0 +1,100 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import io.netty.channel.ChannelHandler;
import io.netty.util.concurrent.ImmediateExecutor;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.util.concurrent.Executor;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
class QuicCodecBuilderTest {
@Test
void testCopyConstructor() throws IllegalAccessException {
TestQuicCodecBuilder original = new TestQuicCodecBuilder();
init(original);
TestQuicCodecBuilder copy = new TestQuicCodecBuilder(original);
assertThat(copy).usingRecursiveComparison().isEqualTo(original);
}
private static void init(TestQuicCodecBuilder builder) throws IllegalAccessException {
Field[] fields = builder.getClass().getSuperclass().getDeclaredFields();
for (Field field : fields) {
modifyField(builder, field);
}
}
private static void modifyField(TestQuicCodecBuilder builder, Field field) throws IllegalAccessException {
field.setAccessible(true);
Class<?> clazz = field.getType();
if (Boolean.class == clazz) {
field.set(builder, Boolean.TRUE);
} else if (Integer.class == clazz) {
field.set(builder, Integer.MIN_VALUE);
} else if (Long.class == clazz) {
field.set(builder, Long.MIN_VALUE);
} else if (QuicCongestionControlAlgorithm.class == clazz) {
field.set(builder, QuicCongestionControlAlgorithm.CUBIC);
} else if (FlushStrategy.class == clazz) {
field.set(builder, FlushStrategy.afterNumBytes(10));
} else if (Function.class == clazz) {
field.set(builder, Function.identity());
} else if (boolean.class == clazz) {
field.setBoolean(builder, true);
} else if (int.class == clazz) {
field.setInt(builder, -1);
} else if (byte[].class == clazz) {
field.set(builder, new byte[16]);
} else if (Executor.class == clazz) {
field.set(builder, ImmediateExecutor.INSTANCE);
} else {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
}
private static final class TestQuicCodecBuilder extends QuicCodecBuilder<TestQuicCodecBuilder> {
TestQuicCodecBuilder() {
super(true);
}
TestQuicCodecBuilder(TestQuicCodecBuilder builder) {
super(builder);
}
@Override
public TestQuicCodecBuilder clone() {
// no-op
return null;
}
@Override
protected ChannelHandler build(
QuicheConfig config,
Function<QuicChannel, ? extends QuicSslEngine> sslContextProvider,
Executor sslTaskExecutor,
int localConnIdLength,
FlushStrategy flushStrategy) {
// no-op
return null;
}
}
}

View file

@ -0,0 +1,59 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class QuicConnectionAddressTest extends AbstractQuicTest {
@Test
public void testNullByteArray() {
assertThrows(NullPointerException.class, () -> new QuicConnectionAddress((byte[]) null));
}
@Test
public void testNullByteBuffer() {
assertThrows(NullPointerException.class, () -> new QuicConnectionAddress((ByteBuffer) null));
}
@Test
public void testByteArrayIsCloned() {
byte[] bytes = new byte[8];
ThreadLocalRandom.current().nextBytes(bytes);
QuicConnectionAddress address = new QuicConnectionAddress(bytes);
assertEquals(ByteBuffer.wrap(bytes), address.connId);
ThreadLocalRandom.current().nextBytes(bytes);
assertNotEquals(ByteBuffer.wrap(bytes), address.connId);
}
@Test
public void tesByteBufferIsDuplicated() {
byte[] bytes = new byte[8];
ThreadLocalRandom.current().nextBytes(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
QuicConnectionAddress address = new QuicConnectionAddress(bytes);
assertEquals(buffer, address.connId);
buffer.position(1);
assertNotEquals(buffer, address.connId);
}
}

View file

@ -0,0 +1,91 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.quic;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class QuicConnectionIdGeneratorTest extends AbstractQuicTest {
@Test
public void testRandomness() {
QuicConnectionIdGenerator idGenerator = QuicConnectionIdGenerator.randomGenerator();
ByteBuffer id = idGenerator.newId(Quiche.QUICHE_MAX_CONN_ID_LEN);
ByteBuffer id2 = idGenerator.newId(Quiche.QUICHE_MAX_CONN_ID_LEN);
assertThat(id.remaining(), greaterThan(0));
assertThat(id2.remaining(), greaterThan(0));
assertNotEquals(id, id2);
id = idGenerator.newId(10);
id2 = idGenerator.newId(10);
assertEquals(10, id.remaining());
assertEquals(10, id2.remaining());
assertNotEquals(id, id2);
byte[] input = new byte[1024];
ThreadLocalRandom.current().nextBytes(input);
id = idGenerator.newId(ByteBuffer.wrap(input), 10);
id2 = idGenerator.newId(ByteBuffer.wrap(input), 10);
assertEquals(10, id.remaining());
assertEquals(10, id2.remaining());
assertNotEquals(id, id2);
}