首先这是一个使用 Netty 框架,并用 UDP 来通信的 Demo。
主要类如下,代码有点多,可以主要看类 3 与类 5
1、用于统计并输出客户端数量的 GetDataHandlerAdapter
public class GetDataHandlerAdapter extends ChannelInboundHandlerAdapter {
private static Logger logger = LogManager.getLogger(GetDataHandlerAdapter.class);
private static volatile AtomicInteger count = new AtomicInteger(1);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info(count.getAndIncrement());
}
}
2、服务端 RPCAgentServer
public class RPCAgentServer {
private GetDataHandlerAdapter getDataHandlerAdapter = new GetDataHandlerAdapter();
public void listen(int port) throws InterruptedException {
Bootstrap serverBootstrap = new Bootstrap();
serverBootstrap.handler(new ChannelInitializer<DatagramChannel>() {
@Override
public void initChannel(DatagramChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(getDataHandlerAdapter);
}
});
serverBootstrap.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.bind(port)
.await();
}
}
3、客户端 RPCAgentClient
public class RPCAgentClient {
private String remoteHost;
private int remotePort;
private int localPort;
private Channel channel;
public static RPCAgentClient getNewClient(String remoteHost, int remotePort) {
RPCAgentClient rpcAgentClient = new RPCAgentClient();
rpcAgentClient.remoteHost = remoteHost;
rpcAgentClient.remotePort = remotePort;
rpcAgentClient.localPort = new Random().nextInt(15000) + 10000;
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.handler(new ChannelInboundHandlerAdapter());
bootstrap.localAddress(rpcAgentClient.localPort);
rpcAgentClient.channel = bootstrap.bind().channel();
} catch (Exception e) {
e.printStackTrace();
}
return rpcAgentClient;
}
public void close() {
if (channel != null) {
channel.close();
}
}
public void sendMsg(String msg) {
try {
channel.writeAndFlush(buildUdpMsg(remoteHost, remotePort, msg));
} catch (Exception e) {
e.printStackTrace();
}
}
private static DatagramPacket buildUdpMsg(String remoteHost, int remotePort, String msg) {
return new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8),
new InetSocketAddress(remoteHost, remotePort));
}
}
4、服务端测试类 NettyServerTest
public class NettyServerTest {
public static final int port = 8289;
public static void main(String[] args) {
try {
new RPCAgentServer().listen(port);
System.out.println(" service start finish ");
System.out.println(new Scanner(System.in).next());
} catch (Exception e) {
e.printStackTrace();
}
}
}
5、 客户端测试类 NettyClientTest,启动 30 个客户端并发送消息 public class NettyClientTest {
private static final String remoteHost = "127.0.0.1";
public static void main(String[] args) {
System.out.println("start start devices");
ExecutorService executorService = Executors.newFixedThreadPool(30);
for (int i = 0; i < 30; i++) {
executorService.execute(() -> {
RPCAgentClient rpcAgentClient = RPCAgentClient.getNewClient(remoteHost, NettyServerTest.port);
rpcAgentClient.sendMsg("1");
rpcAgentClient.close();
});
}
try {
executorService.shutdown();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("start devices finish");
}
}
启动顺序是先启动 NettyServerTest,再启动 NettyClientTest,观察 NettyServerTest 输出的计数。
以现在的代码 NettyServerTest 输出的是正确的数量 30
但是如果我在 RPCAgentClient 的 getNewClient 方法上加上 synchronized 关键字,不要问我为什么要加。。getNewClient 方法就变成
public synchronized static RPCAgentClient getNewClient(String remoteHost, int remotePort)
那么 NettyServerTest 输出的计数将小于 30,而且在我的电脑上的结果是小于 20。
那么问题来了,这是为什么?
1
sagaxu 2019-05-30 00:22:21 +08:00 via Android
跟 synchronized 无关,netty 用的不对
|
2
cookii 2019-05-30 09:10:27 +08:00 1
一楼说的对,跟 synchronized 无关,netty 建立连接是一个异步操作,你没有等待就直接把 channel 发布出去了。
解决方法是把 getNewClient 方法的 rpcAgentClient.channel = bootstrap.bind().channel(); 改成 rpcAgentClient.channel = bootstrap.bind().sync().channel(); |
3
ColoThor OP @imzhoukunqiang 谢谢,的确是这个问题。我看了下源码,channel 是 new 出来的,不为 null 不代表建立成功。但我还是不明白为什么加了 synchronized 更容易暴露问题
|
4
cookii 2019-05-30 14:39:17 +08:00 1
@ColoThor 我猜测是 synchronized 导致线程切换。这种不安全的操作,讨论 synchronized 其实意义不大,不用过多纠结。
|
5
ColoThor OP @imzhoukunqiang 好的,谢谢回答
|