使用Java搭建一个简朴的Netty通讯例子
看过dubbo源码的同砚应该都清晰,使用dubbo协议的底层通讯是使用的netty举行交互,而最近看了dubbo的Netty部门后,自己写了个简朴的Netty通讯例子。
本文源地址:实现Netty举行通讯
准备
工程截图
模块详解
- rpc-common
rpc-common作为各个模块都需使用的模块,工程中泛起的是一些通讯时请求的参数以及返回的参数,另有一些序列化的工具。
- rpc-client
rpc-client中现在只是单单的一个NettyClient启动类。
- rpc-server
rpc-client中现在也只是单单的一个NettyServer服务启动类。
需要的依赖
现在所有的依赖项都泛起在 rpc-common 下的 pom.xml中。
<dependencies>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- Protostuff -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.9</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.9</version>
</dependency>
<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
</dependencies>
实现
首先我们在common中先界说本次的Request和Response的基类工具。
public class Request {
private String requestId;
private Object parameter;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getParameter() {
return parameter;
}
public void setParameter(Object parameter) {
this.parameter = parameter;
}
}
public class Response {
private String requestId;
private Object result;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
使用fastJson举行本次序列化
Netty工具的序列化转换很好懂, ByteToMessageDecoder
和 MessageToByteEncoder
划分只要继续它们,重写方式后,获取到Object和Byte,各自转换就OK。
不外若是是有要用到生产上的同砚,建议不要使用 fastJson
,由于它的漏洞补丁真的是太多了,可以使用google的 protostuff
。
public class RpcDecoder extends ByteToMessageDecoder {
// 目的工具类型举行解码
private Class<?> target;
public RpcDecoder(Class target) {
this.target = target;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) { // 不够长度抛弃
return;
}
in.markReaderIndex(); // 符号一下当前的readIndex的位置
int dataLength = in.readInt(); // 读取传送过来的新闻的长度。ByteBuf 的readInt()方式会让他的readIndex增添4
if (in.readableBytes() < dataLength) { // 读到的新闻体长度若是小于我们传送过来的新闻长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = JSON.parseObject(data, target); // 将byte数据转化为我们需要的工具
out.add(obj);
}
}
public class RpcEncoder extends MessageToByteEncoder {
//目的工具类型举行编码
private Class<?> target;
public RpcEncoder(Class target) {
this.target = target;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (target.isInstance(msg)) {
byte[] data = JSON.toJSONBytes(msg); // 使用fastJson将工具转换为byte
out.writeInt(data.length); // 先将新闻长度写入,也就是新闻头
out.writeBytes(data); // 新闻体中包罗我们要发送的数据
}
}
}
NetyServer
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out.println("Client Data:" + JSON.toJSONString(request));
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setResult("Hello Client !");
// client吸收到信息后自动关闭掉毗邻
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private String ip;
private int port;
public NettyServer(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void server() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class))
.addLast(new NettyServerHandler());
}
});
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启长毗邻
ChannelFuture future = serverBootstrap.bind(ip, port).sync();
// if (future.isSuccess()) {
//
// new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
// }
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyServer("127.0.0.1", 20000).server();
}
}
要害名词:
-
EventLoopGroup
- workerGroup
- bossGroup
Server端的EventLoopGroup分为两个,一样平常workerGroup作为处置请求,bossGroup作为吸收请求。
-
ChannelOption
- SO_BACKLOG
- SO_SNDBUF
- SO_RCVBUF
- SO_KEEPALIVE
以上四个常量作为TCP毗邻中的属性。
-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中泛起的
ChannelFutureListener.CLOSE
,作为Server端自动关闭与Client端的通讯,若是没有自动Close,那么NettyClient将会一直处于阻塞状态,得不到NettyServer的返回信息。
NettyClient
public class NettyClient extends SimpleChannelInboundHandler<Response> {
private final String ip;
private final int port;
private Response response;
public NettyClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response;
}
public Response client(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 建立并初始化 Netty 客户端 Bootstrap 工具
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcDecoder(Response.class));
pipeline.addLast(new RpcEncoder(Request.class));
pipeline.addLast(NettyClient.this);
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");
// 毗邻 RPC 服务器
ChannelFuture future = bootstrap.connect(ip, port).sync();
// 写入 RPC 请求数据并关闭毗邻
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return response;
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString());
request.setParameter("Hello Server !");
System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
}
}
测试
若是以上所有内容都准备就绪,那么就可以举行调试了。
启动顺序,先启动NettyServer,再启动NettyClient。
总结
记得刚出来事情时,有事情很多年的同事问我了不领会Netty,那时事情太短,直说听过Putty,现在回想起来真的挺丢人的,哈哈。
Netty作为通讯框架,若是你领会TCP,而且项目中有类似传输信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺适用的。
参考资料:
Dubbo-Netty
Netty.io
本项目Github地址:Netty-RPC
,Sunbet www.9cx.net致力于打造申博娱乐平台,门下的申博打造拥有最让消费者更安心体验环境!Sunbet一个让您宾至如归的老牌网站!