菠菜网

赣州市房产网:简朴的Java实现Netty举行通讯

时间:4个月前   阅读:43

使用Java搭建一个简朴的Netty通讯例子

看过dubbo源码的同砚应该都清晰,使用dubbo协议的底层通讯是使用的netty举行交互,而最近看了dubbo的Netty部门后,自己写了个简朴的Netty通讯例子。

本文源地址:实现Netty举行通讯

准备

工程截图

模块详解

  • rpc-common

rpc-common作为各个模块都需使用的模块,工程中泛起的是一些通讯时请求的参数以及返回的参数,另有一些序列化的工具。

  • rpc-client

rpc-client中现在只是单单的一个NettyClient启动类。

  • rpc-server

rpc-client中现在也只是单单的一个NettyServer服务启动类。

需要的依赖

现在所有的依赖项都泛起在 rpc-common 下的 pom.xml中。

<dependencies>
    <!-- Netty -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.10.Final</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

    <!-- Protostuff -->
    <dependency>
        <groupId>com.dyuproject.protostuff</groupId>
        <artifactId>protostuff-core</artifactId>
        <version>1.0.9</version>
    </dependency>

    <dependency>
        <groupId>com.dyuproject.protostuff</groupId>
        <artifactId>protostuff-runtime</artifactId>
        <version>1.0.9</version>
    </dependency>

    <!-- Objenesis -->
    <dependency>
        <groupId>org.objenesis</groupId>
        <artifactId>objenesis</artifactId>
        <version>2.1</version>
    </dependency>

    <!-- fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.38</version>
    </dependency>
</dependencies>

实现

首先我们在common中先界说本次的Request和Response的基类工具。

public class Request {

    private String requestId;

    private Object parameter;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Object getParameter() {
        return parameter;
    }

    public void setParameter(Object parameter) {
        this.parameter = parameter;
    }
}

public class Response {

    private String requestId;

    private Object result;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

使用fastJson举行本次序列化

Netty工具的序列化转换很好懂, ByteToMessageDecoderMessageToByteEncoder 划分只要继续它们,重写方式后,获取到Object和Byte,各自转换就OK。

不外若是是有要用到生产上的同砚,建议不要使用 fastJson,由于它的漏洞补丁真的是太多了,可以使用google的 protostuff

public class RpcDecoder extends ByteToMessageDecoder {

    // 目的工具类型举行解码
    private Class<?> target;

    public RpcDecoder(Class target) {
        this.target = target;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {   // 不够长度抛弃
            return;
        }
        in.markReaderIndex();   // 符号一下当前的readIndex的位置
        int dataLength = in.readInt();  // 读取传送过来的新闻的长度。ByteBuf 的readInt()方式会让他的readIndex增添4

        if (in.readableBytes() < dataLength) {  // 读到的新闻体长度若是小于我们传送过来的新闻长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = JSON.parseObject(data, target);    // 将byte数据转化为我们需要的工具
        out.add(obj);
    }
}

public class RpcEncoder extends MessageToByteEncoder {

    //目的工具类型举行编码
    private Class<?> target;

    public RpcEncoder(Class target) {
        this.target = target;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if (target.isInstance(msg)) {
            byte[] data = JSON.toJSONBytes(msg);    // 使用fastJson将工具转换为byte
            out.writeInt(data.length);  // 先将新闻长度写入,也就是新闻头
            out.writeBytes(data);   // 新闻体中包罗我们要发送的数据
        }
    }

}

NetyServer

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;

        System.out.println("Client Data:" + JSON.toJSONString(request));

        Response response = new Response();
        response.setRequestId(request.getRequestId());
        response.setResult("Hello Client !");

        // client吸收到信息后自动关闭掉毗邻
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

public class NettyServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private String ip;
    private int port;

    public NettyServer(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void server() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            final ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                                    .addLast(new RpcEncoder(Response.class))
                                    .addLast(new NettyServerHandler());
                        }
                    });

            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  // 开启长毗邻

            ChannelFuture future = serverBootstrap.bind(ip, port).sync();

//            if (future.isSuccess()) {
//
//                new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
//            }

            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyServer("127.0.0.1", 20000).server();
    }
}

要害名词:

  • EventLoopGroup

    • workerGroup
    • bossGroup

    Server端的EventLoopGroup分为两个,一样平常workerGroup作为处置请求,bossGroup作为吸收请求。

  • ChannelOption

    • SO_BACKLOG
    • SO_SNDBUF
    • SO_RCVBUF
    • SO_KEEPALIVE

    以上四个常量作为TCP毗邻中的属性。

  • ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

    NettyServerHandler中泛起的 ChannelFutureListener.CLOSE ,作为Server端自动关闭与Client端的通讯,若是没有自动Close,那么NettyClient将会一直处于阻塞状态,得不到NettyServer的返回信息。

NettyClient

public class NettyClient extends SimpleChannelInboundHandler<Response> {

    private final String ip;
    private final int port;
    private Response response;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
    }

    public Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            // 建立并初始化 Netty 客户端 Bootstrap 工具
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast(new RpcDecoder(Response.class));
                    pipeline.addLast(new RpcEncoder(Request.class));
                    pipeline.addLast(NettyClient.this);
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);


//            String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

            // 毗邻 RPC 服务器
            ChannelFuture future = bootstrap.connect(ip, port).sync();

            // 写入 RPC 请求数据并关闭毗邻
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        Request request = new Request();
        request.setRequestId(UUID.randomUUID().toString());
        request.setParameter("Hello Server !");
        System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
    }
}

测试

若是以上所有内容都准备就绪,那么就可以举行调试了。

启动顺序,先启动NettyServer,再启动NettyClient。

总结

记得刚出来事情时,有事情很多年的同事问我了不领会Netty,那时事情太短,直说听过Putty,现在回想起来真的挺丢人的,哈哈。

Netty作为通讯框架,若是你领会TCP,而且项目中有类似传输信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺适用的。

参考资料:

Dubbo-Netty

Netty.io

本项目Github地址:Netty-RPC

,

sunbet

Sunbet www.ipvps.cn致力于打造申博娱乐平台,门下的申博打造拥有最让消费者更安心体验环境!Sunbet一个让您宾至如归的老牌网站!

上一篇:九江租房网:高校复学后这样上课成常态!教育部最新亮相

下一篇:皇冠官网app:〔艺术文化〕乐兴之时岁末祈福音乐会 贝九歌颂生命之美

网友评论