add epoll native

This commit is contained in:
Jörg Prante 2024-01-13 11:34:04 +01:00
parent 49b6c772a5
commit 3d532a821f
180 changed files with 19923 additions and 9 deletions

View file

@ -25,6 +25,7 @@ The following changes were performed on the original source code:
- removed all macos related code (including kqueue)
- removed all aarch64 related code
- removed the direct brotli4j dependency by rewriting Brotli4jOptions to not use Encoder.Parameters
- removed deprecated channel-udt (transport-udt)
Challenges for Netty build on JDK 21

View file

@ -10,13 +10,11 @@ dependencies {
test {
useJUnitPlatform()
failFast = true
failFast = false
testLogging {
events 'STARTED', 'PASSED', 'FAILED', 'SKIPPED'
showStandardStreams = true
}
minHeapSize = "1g" // initial heap size
maxHeapSize = "2g" // maximum heap size
jvmArgs '--add-exports=java.base/jdk.internal=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-exports=java.base/sun.nio.ch=ALL-UNNAMED',
@ -30,11 +28,6 @@ test {
'--add-opens=jdk.unsupported/sun.misc=ALL-UNNAMED',
'-Dio.netty.bootstrap.extensions=serviceload'
systemProperty 'java.util.logging.config.file', 'src/test/resources/logging.properties'
// we have remove native images, this is no longer used
systemProperty "nativeImage.handlerMetadataGroupId", "io.netty"
// we have remove native images
// we have remove native images, this is no longer used
systemProperty "nativeimage.handlerMetadataArtifactId", "netty-transport"
afterSuite { desc, result ->
if (!desc.parent) {
println "\nTest result: ${result.resultType}"

View file

@ -0,0 +1,82 @@
apply plugin: 'com.google.osdetector'
dependencies {
testImplementation project(':netty-channel-unix')
testImplementation project(':netty-channel-epoll')
testImplementation project(':netty-testsuite')
testImplementation project(':netty-handler')
testImplementation testLibs.assertj
testImplementation testLibs.rerunner.jupiter
testRuntimeOnly project(path: ':netty-tcnative-boringssl-static', configuration: osdetector.classifier)
}
task nettyEpollLinuxX8664(type: Jar) {
archiveBaseName.set('netty-channel-epoll-native')
archiveClassifier.set('linux-x86_64')
version rootProject.version
from (sourceSets.main.output) {
include 'META-INF/native/libnetty_transport_native_epoll_x86_64.so'
}
}
assemble.dependsOn(nettyEpollLinuxX8664)
configurations {
'linux-x86_64' {
canBeConsumed = true
canBeResolved = false
extendsFrom runtimeOnly
}
}
artifacts {
'linux-x86_64'(nettyEpollLinuxX8664)
}
publishing {
publications {
publishNettyEpollLinuxX8664(MavenPublication) {
groupId rootProject.group
artifactId project.name
version rootProject.version
artifact nettyEpollLinuxX8664
pom {
artifactId = project.name
name = project.name
version = project.version
description = rootProject.ext.description
url = rootProject.ext.url
inceptionYear = rootProject.ext.inceptionYear
packaging = 'jar'
organization {
name = rootProject.ext.organizationName
url = rootProject.ext.organizationUrl
}
developers {
developer {
id = 'jprante'
name = 'Jörg Prante'
email = 'joergprante@gmail.com'
url = 'https://xbib.org/joerg'
}
}
scm {
url = rootProject.ext.scmUrl
connection = rootProject.ext.scmConnection
developerConnection = rootProject.ext.scmDeveloperConnection
}
issueManagement {
system = rootProject.ext.issueManagementSystem
url = rootProject.ext.issueManagementUrl
}
licenses {
license {
name = rootProject.ext.licenseName
url = rootProject.ext.licenseUrl
distribution = 'repo'
}
}
}
}
}
}

View file

@ -0,0 +1,212 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
public abstract class DetectPeerCloseWithoutReadTest {
protected abstract EventLoopGroup newGroup();
protected abstract Class<? extends ServerChannel> serverChannel();
protected abstract Class<? extends Channel> clientChannel();
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws InterruptedException {
clientCloseWithoutServerReadIsDetected0(false);
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws InterruptedException {
clientCloseWithoutServerReadIsDetected0(true);
}
private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested)
throws InterruptedException {
EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null;
Channel serverChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger bytesRead = new AtomicInteger();
final int expectedBytes = 100;
serverGroup = newGroup();
clientGroup = newGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(serverChannel());
// Ensure we read only one message per read() call and that we need multiple read()
// calls to consume everything.
sb.childOption(ChannelOption.AUTO_READ, false);
sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(serverGroup);
cb.channel(clientChannel());
cb.handler(new ChannelInboundHandlerAdapter());
Channel clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
ByteBuf buf = clientChannel.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
clientChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
latch.await();
assertEquals(expectedBytes, bytesRead.get());
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (serverGroup != null) {
serverGroup.shutdownGracefully();
}
if (clientGroup != null) {
clientGroup.shutdownGracefully();
}
}
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws InterruptedException {
serverCloseWithoutClientReadIsDetected0(false);
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws InterruptedException {
serverCloseWithoutClientReadIsDetected0(true);
}
private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws InterruptedException {
EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null;
Channel serverChannel = null;
Channel clientChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger bytesRead = new AtomicInteger();
final int expectedBytes = 100;
serverGroup = newGroup();
clientGroup = newGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(serverChannel());
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
ctx.fireChannelActive();
}
});
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(serverGroup);
cb.channel(clientChannel());
// Ensure we read only one message per read() call and that we need multiple read()
// calls to consume everything.
cb.option(ChannelOption.AUTO_READ, false);
cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
cb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
}
});
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
latch.await();
assertEquals(expectedBytes, bytesRead.get());
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
if (serverGroup != null) {
serverGroup.shutdownGracefully();
}
if (clientGroup != null) {
clientGroup.shutdownGracefully();
}
}
}
private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final AtomicInteger bytesRead;
private final boolean extraReadRequested;
private final CountDownLatch latch;
TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
this.bytesRead = bytesRead;
this.extraReadRequested = extraReadRequested;
this.latch = latch;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
bytesRead.addAndGet(msg.readableBytes());
if (extraReadRequested) {
// Because autoread is off, we call read to consume all data until we detect the close.
ctx.read();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown();
ctx.fireChannelInactive();
}
}
}

View file

@ -0,0 +1,28 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.unix.DomainSocketAddress;
import java.net.SocketAddress;
import java.util.UUID;
public class EpollAbstractDomainSocketEchoTest extends EpollDomainSocketEchoTest {
@Override
protected SocketAddress newSocketAddress() {
// these don't actually show up in the file system so creating a temp file isn't reliable
return new DomainSocketAddress("\0" + System.getProperty("java.io.tmpdir") + UUID.randomUUID());
}
}

View file

@ -0,0 +1,93 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelException;
import io.netty.channel.unix.Buffer;
import io.netty.channel.unix.IntegerUnixChannelOption;
import io.netty.channel.unix.RawUnixChannelOption;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class EpollChannelConfigTest {
@Test
public void testOptionGetThrowsChannelException() throws Exception {
Epoll.ensureAvailability();
EpollSocketChannel channel = new EpollSocketChannel();
channel.config().getSoLinger();
channel.fd().close();
try {
channel.config().getSoLinger();
fail();
} catch (ChannelException e) {
// expected
}
}
@Test
public void testOptionSetThrowsChannelException() throws Exception {
Epoll.ensureAvailability();
EpollSocketChannel channel = new EpollSocketChannel();
channel.config().setKeepAlive(true);
channel.fd().close();
try {
channel.config().setKeepAlive(true);
fail();
} catch (ChannelException e) {
// expected
}
}
@Test
public void testIntegerOption() throws Exception {
Epoll.ensureAvailability();
EpollSocketChannel channel = new EpollSocketChannel();
IntegerUnixChannelOption opt = new IntegerUnixChannelOption("INT_OPT", 1, 2);
Integer zero = 0;
assertEquals(zero, channel.config().getOption(opt));
channel.config().setOption(opt, 1);
assertNotEquals(zero, channel.config().getOption(opt));
channel.fd().close();
}
@Test
public void testRawOption() throws Exception {
Epoll.ensureAvailability();
EpollSocketChannel channel = new EpollSocketChannel();
// Value for SOL_SOCKET and SO_REUSEADDR
// See https://github.com/torvalds/linux/blob/v5.17/include/uapi/asm-generic/socket.h
RawUnixChannelOption opt = new RawUnixChannelOption("RAW_OPT", 1, 2, 4);
ByteBuffer disabled = Buffer.allocateDirectWithNativeOrder(4);
disabled.putInt(0).flip();
assertEquals(disabled, channel.config().getOption(opt));
ByteBuffer enabled = Buffer.allocateDirectWithNativeOrder(4);
enabled.putInt(1).flip();
channel.config().setOption(opt, enabled);
assertNotEquals(disabled, channel.config().getOption(opt));
channel.fd().close();
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelConfig;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.CompositeBufferGatheringWriteTest;
import java.util.List;
public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override
protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(ChannelConfig config,
int soSndBuf) {
if (config instanceof EpollChannelConfig) {
((EpollChannelConfig) config).setMaxBytesPerGatheringWrite(soSndBuf);
}
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class EpollDatagramChannelConfigTest {
@Test
public void testIpFreeBind() throws Exception {
Epoll.ensureAvailability();
EpollDatagramChannel channel = new EpollDatagramChannel();
assertTrue(channel.config().setOption(EpollChannelOption.IP_FREEBIND, true));
assertTrue(channel.config().getOption(EpollChannelOption.IP_FREEBIND));
channel.fd().close();
}
}

View file

@ -0,0 +1,115 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.Socket;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static io.netty.util.NetUtil.LOCALHOST;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class EpollDatagramChannelTest {
@BeforeEach
public void setUp() {
Epoll.ensureAvailability();
}
@Test
public void testDefaultMaxMessagePerRead() {
EpollDatagramChannel channel = new EpollDatagramChannel();
assertEquals(16, channel.config().getMaxMessagesPerRead());
channel.unsafe().closeForcibly();
}
@Test
public void testNotActiveNoLocalRemoteAddress() throws IOException {
checkNotActiveNoLocalRemoteAddress(new EpollDatagramChannel());
checkNotActiveNoLocalRemoteAddress(new EpollDatagramChannel(InternetProtocolFamily.IPv4));
checkNotActiveNoLocalRemoteAddress(new EpollDatagramChannel(InternetProtocolFamily.IPv6));
}
@Test
public void testActiveHasLocalAddress() throws IOException {
Socket socket = Socket.newSocketDgram();
EpollDatagramChannel channel = new EpollDatagramChannel(socket.intValue());
InetSocketAddress localAddress = channel.localAddress();
assertTrue(channel.active);
assertNotNull(localAddress);
assertEquals(socket.localAddress(), localAddress);
channel.fd().close();
}
@Test
public void testLocalAddressBeforeAndAfterBind() {
EventLoopGroup group = new EpollEventLoopGroup(1);
try {
TestHandler handler = new TestHandler();
InetSocketAddress localAddressBeforeBind = new InetSocketAddress(LOCALHOST, 0);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(EpollDatagramChannel.class)
.localAddress(localAddressBeforeBind)
.handler(handler);
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
assertNull(handler.localAddress);
SocketAddress localAddressAfterBind = future.channel().localAddress();
assertNotNull(localAddressAfterBind);
assertTrue(localAddressAfterBind instanceof InetSocketAddress);
assertTrue(((InetSocketAddress) localAddressAfterBind).getPort() != 0);
future.channel().close().syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
private static void checkNotActiveNoLocalRemoteAddress(EpollDatagramChannel channel) throws IOException {
assertFalse(channel.active);
assertNull(channel.localAddress());
assertNull(channel.remoteAddress());
channel.fd().close();
}
private static final class TestHandler extends ChannelInboundHandlerAdapter {
private volatile SocketAddress localAddress;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.localAddress = ctx.channel().localAddress();
super.channelRegistered(ctx);
}
}
}

View file

@ -0,0 +1,30 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest;
import java.util.List;
public class EpollDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagramSocket();
}
}

View file

@ -0,0 +1,30 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramMulticastIPv6Test;
import java.util.List;
public class EpollDatagramMulticastIPv6Test extends DatagramMulticastIPv6Test {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.testsuite.transport.socket.DatagramMulticastTest;
public class EpollDatagramMulticastIpv6WithIpv4AddrTest extends DatagramMulticastTest {
@Override
protected InternetProtocolFamily groupInternetProtocalFamily() {
return InternetProtocolFamily.IPv4;
}
@Override
protected InternetProtocolFamily socketInternetProtocalFamily() {
return InternetProtocolFamily.IPv6;
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramMulticastTest;
import java.util.List;
public class EpollDatagramMulticastTest extends DatagramMulticastTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View file

@ -0,0 +1,307 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.AbstractDatagramTest;
import io.netty.util.internal.PlatformDependent;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
@BeforeAll
public static void assumeRecvmmsgSupported() {
assumeTrue(Native.IS_SUPPORTING_RECVMMSG);
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.epollOnlyDatagram(internetProtocolFamily());
}
@Test
public void testScatteringReadPartial(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringReadPartial(bootstrap, bootstrap2);
}
});
}
public void testScatteringReadPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, false, true);
}
@Test
public void testScatteringRead(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringRead(bootstrap, bootstrap2);
}
});
}
public void testScatteringRead(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, false, false);
}
@Test
public void testScatteringReadConnectedPartial(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringReadConnectedPartial(bootstrap, bootstrap2);
}
});
}
public void testScatteringReadConnectedPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, true, true);
}
@Test
public void testScatteringConnectedRead(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringConnectedRead(bootstrap, bootstrap2);
}
});
}
public void testScatteringConnectedRead(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, true, false);
}
private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean connected, boolean partial) throws Throwable {
int packetSize = 8;
int numPackets = 4;
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(
packetSize, packetSize * (partial ? numPackets / 2 : numPackets), 64 * 1024));
// Set the MAX_DATAGRAM_PAYLOAD_SIZE to something bigger then the actual packet size.
// This will allow us to check if we correctly thread the received len.
sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, packetSize * 2);
Channel sc = null;
Channel cc = null;
try {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
}
});
cc = cb.bind(newSocketAddress()).sync().channel();
final SocketAddress ccAddress = cc.localAddress();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(numPackets);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
private long numRead;
private int counter;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
assertTrue(counter > 1);
counter = 0;
ctx.read();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertEquals(ccAddress, msg.sender());
// Each packet contains a long which represent the write iteration.
assertEquals(8, msg.content().readableBytes());
assertEquals(numRead, msg.content().readLong());
numRead++;
counter++;
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.compareAndSet(null, cause);
}
});
sb.option(ChannelOption.AUTO_READ, false);
sc = sb.bind(newSocketAddress()).sync().channel();
if (connected) {
sc.connect(cc.localAddress()).syncUninterruptibly();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
List<ChannelFuture> futures = new ArrayList<ChannelFuture>(numPackets);
for (int i = 0; i < numPackets; i++) {
futures.add(cc.write(new DatagramPacket(cc.alloc().directBuffer().writeLong(i), addr)));
}
cc.flush();
for (ChannelFuture f: futures) {
f.sync();
}
// Enable autoread now which also triggers a read, this should cause scattering reads (recvmmsg) to happen.
sc.config().setAutoRead(true);
if (!latch.await(10, TimeUnit.SECONDS)) {
Throwable error = errorRef.get();
if (error != null) {
throw error;
}
fail("Timeout while waiting for packets");
}
} finally {
if (cc != null) {
cc.close().syncUninterruptibly();
}
if (sc != null) {
sc.close().syncUninterruptibly();
}
}
}
@Test
public void testScatteringReadWithSmallBuffer(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringReadWithSmallBuffer(bootstrap, bootstrap2);
}
});
}
public void testScatteringReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringReadWithSmallBuffer0(sb, cb, false);
}
@Test
public void testScatteringConnectedReadWithSmallBuffer(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testScatteringConnectedReadWithSmallBuffer(bootstrap, bootstrap2);
}
});
}
public void testScatteringConnectedReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringReadWithSmallBuffer0(sb, cb, true);
}
private void testScatteringReadWithSmallBuffer0(Bootstrap sb, Bootstrap cb, boolean connected) throws Throwable {
int packetSize = 16;
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1400, 1400, 64 * 1024));
sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, 1400);
Channel sc = null;
Channel cc = null;
try {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
// Nothing will be sent.
}
});
cc = cb.bind(newSocketAddress()).sync().channel();
final SocketAddress ccAddress = cc.localAddress();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
final byte[] bytes = new byte[packetSize];
PlatformDependent.threadLocalRandom().nextBytes(bytes);
final CountDownLatch latch = new CountDownLatch(1);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertEquals(ccAddress, msg.sender());
assertEquals(bytes.length, msg.content().readableBytes());
byte[] receivedBytes = new byte[bytes.length];
msg.content().readBytes(receivedBytes);
assertArrayEquals(bytes, receivedBytes);
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.compareAndSet(null, cause);
}
});
sc = sb.bind(newSocketAddress()).sync().channel();
if (connected) {
sc.connect(cc.localAddress()).syncUninterruptibly();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
cc.writeAndFlush(new DatagramPacket(cc.alloc().directBuffer().writeBytes(bytes), addr)).sync();
if (!latch.await(10, TimeUnit.SECONDS)) {
Throwable error = errorRef.get();
if (error != null) {
throw error;
}
fail("Timeout while waiting for packets");
}
} finally {
if (cc != null) {
cc.close().syncUninterruptibly();
}
if (sc != null) {
sc.close().syncUninterruptibly();
}
}
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.socket.DatagramUnicastIPv6MappedTest;
import java.util.List;
public class EpollDatagramUnicastIPv6MappedTest extends DatagramUnicastIPv6MappedTest {
@Override
protected List<BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramUnicastIPv6Test;
import java.util.List;
public class EpollDatagramUnicastIPv6Test extends DatagramUnicastIPv6Test {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View file

@ -0,0 +1,190 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramUnicastInetTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class EpollDatagramUnicastTest extends DatagramUnicastInetTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(InternetProtocolFamily.IPv4);
}
public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
// Run this test with IP_RECVORIGDSTADDR option enabled
sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, true);
super.testSimpleSendWithConnect(sb, cb);
sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, false);
}
@Test
public void testSendSegmentedDatagramPacket(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testSendSegmentedDatagramPacket(bootstrap, bootstrap2);
}
});
}
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSegmentedDatagramPacket(sb, cb, false, false);
}
@Test
public void testSendSegmentedDatagramPacketComposite(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testSendSegmentedDatagramPacketComposite(bootstrap, bootstrap2);
}
});
}
public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
testSegmentedDatagramPacket(sb, cb, true, false);
}
@Test
public void testSendAndReceiveSegmentedDatagramPacket(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testSendAndReceiveSegmentedDatagramPacket(bootstrap, bootstrap2);
}
});
}
public void testSendAndReceiveSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSegmentedDatagramPacket(sb, cb, false, true);
}
@Test
public void testSendAndReceiveSegmentedDatagramPacketComposite(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<Bootstrap, Bootstrap>() {
@Override
public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
testSendAndReceiveSegmentedDatagramPacketComposite(bootstrap, bootstrap2);
}
});
}