Netty网络编程自用资料 | 字数总计: 2.3k | 阅读时长: 12分钟 | 阅读量: 
gitee:https://gitee.com/niumazlb/mynettydemo https://github.com/ishuaige/mynettydemo 
服务端 这里用 netty+WebSocket 举例
创建 ServerBootstrap - bootstrap 
bootstrap 关联两个事件循环组(EventLoopGroup) boss 和 work 
bootstrap 配置项 option 
bootstrap 绑定 channel 
bootstrap 绑定 handler,这里可以做一个封装 
bootstrap 绑定 绑定端口 
bootstrap 绑定监听关闭连接事件,处理 boss 和 work 的关闭 
 
以上 2-5 顺序随意
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import  com.niuma.mynetty.server.handler.MyWebSocketChannelHandler;import  io.netty.bootstrap.ServerBootstrap;import  io.netty.channel.*;import  io.netty.channel.nio.NioEventLoopGroup;import  io.netty.channel.socket.nio.NioServerSocketChannel;import  org.springframework.stereotype.Component;import  javax.annotation.Resource;@ChannelHandler .Sharable    @Component      public  class  WebSocketServer  {         EventLoopGroup  boss  =  new  NioEventLoopGroup ();         EventLoopGroup  work  =  new  NioEventLoopGroup ();         @Resource          MyWebSocketChannelHandler myWebSocketChannelHandler;         public  void  run ()  {             try  {                 ServerBootstrap  bootstrap  =  new  ServerBootstrap ();                 bootstrap.group(boss, work);                 bootstrap.option(ChannelOption.SO_BACKLOG, 1024 );                 bootstrap.channel(NioServerSocketChannel.class);                                  bootstrap.childHandler(myWebSocketChannelHandler);                 Channel  channel  =  bootstrap.bind(8888 ).sync().channel();                 channel.closeFuture().addListener(future -> {                     boss.shutdownGracefully();                     work.shutdownGracefully();                 });             } catch  (InterruptedException e) {                 e.printStackTrace();             }         }     } 
注意 Handler 的继承喔
这里的 Handler 是处理的 websocket 的,其他需要具体情况具体分析 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package  com.niuma.mynetty.server.handler;import  com.niuma.mynetty.config.NettyConfig;import  io.netty.channel.ChannelHandlerContext;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.socket.SocketChannel;import  io.netty.handler.codec.http.HttpObjectAggregator;import  io.netty.handler.codec.http.HttpServerCodec;import  io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import  io.netty.handler.logging.LogLevel;import  io.netty.handler.logging.LoggingHandler;import  io.netty.handler.stream.ChunkedWriteHandler;import  org.springframework.stereotype.Component;import  javax.annotation.Resource;@Component public  class  MyWebSocketChannelHandler  extends  ChannelInitializer <SocketChannel> {    @Resource      MyWebSocketHandler myWebSocketHandler ;     @Resource      RegisterHandler registerHandler ;     @Resource      SingleMessageHandler singleMessageHandler ;     @Override      protected  void  initChannel (SocketChannel ch)  throws  Exception {                           ch.pipeline().addLast("http-codec" , new  HttpServerCodec ())                                  .addLast("log" ,new  LoggingHandler (LogLevel.DEBUG))                                  .addLast("aggregator" ,new  HttpObjectAggregator (65536 ))                                   .addLast("http-chunked" ,new  ChunkedWriteHandler ())                                    .addLast("protocolHandler" ,new  WebSocketServerProtocolHandler ("/websocket" ))                                                   .addLast(myWebSocketHandler)                                  .addLast(registerHandler)                 .addLast(singleMessageHandler);     }          @Override      public  void  channelActive (ChannelHandlerContext ctx)  throws  Exception {         NettyConfig.group.add(ctx.channel());         System.out.println("客户端与服务端连接开启...." );     }          @Override      public  void  channelInactive (ChannelHandlerContext ctx)  throws  Exception {         NettyConfig.group.remove(ctx.channel());         System.out.println("客户端与服务端连接关闭...." );     }          @Override      public  void  channelReadComplete (ChannelHandlerContext ctx)  throws  Exception {         ctx.flush();     }          @Override      public  void  exceptionCaught (ChannelHandlerContext ctx, Throwable cause)  throws  Exception {         cause.printStackTrace();         ctx.close();     } } 
简单的 RPC 框架 参考黑马程序员 netty 课程
协议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package  cn.itcast.protocol;import  cn.itcast.config.Config;import  cn.itcast.message.Message;import  io.netty.buffer.ByteBuf;import  io.netty.channel.ChannelHandler;import  io.netty.channel.ChannelHandlerContext;import  io.netty.handler.codec.MessageToMessageCodec;import  lombok.extern.slf4j.Slf4j;import  java.util.List;@Slf4j @ChannelHandler .Sharablepublic  class  MessageCodecSharable  extends  MessageToMessageCodec <ByteBuf, Message> {    @Override      public  void  encode (ChannelHandlerContext ctx, Message msg, List<Object> outList)  throws  Exception {         ByteBuf  out  =  ctx.alloc().buffer();                  out.writeBytes(new  byte []{1 , 2 , 3 , 4 });                  out.writeByte(1 );                  out.writeByte(Config.getSerializerAlgorithm().ordinal());                  out.writeByte(msg.getMessageType());                  out.writeInt(msg.getSequenceId());                  out.writeByte(0xff );                  byte [] bytes = Config.getSerializerAlgorithm().serialize(msg);                  out.writeInt(bytes.length);                  out.writeBytes(bytes);         outList.add(out);     }     @Override      protected  void  decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  throws  Exception {         int  magicNum  =  in.readInt();         byte  version  =  in.readByte();         byte  serializerAlgorithm  =  in.readByte();          byte  messageType  =  in.readByte();          int  sequenceId  =  in.readInt();         in.readByte();         int  length  =  in.readInt();         byte [] bytes = new  byte [length];         in.readBytes(bytes, 0 , length);                  Serializer.Algorithm  algorithm  =  Serializer.Algorithm.values()[serializerAlgorithm];                  Class<? extends  Message > messageClass = Message.getMessageClass(messageType);         Message  message  =  algorithm.deserialize(messageClass, bytes);         out.add(message);     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package  cn.itcast.protocol;import  io.netty.handler.codec.LengthFieldBasedFrameDecoder;public  class  ProtocolFrameDecoder  extends  LengthFieldBasedFrameDecoder  {    public  ProtocolFrameDecoder ()  {         this (1024 , 12 , 4 , 0 , 0 );     }     public  ProtocolFrameDecoder (int  maxFrameLength, int  lengthFieldOffset, int  lengthFieldLength, int  lengthAdjustment, int  initialBytesToStrip)  {         super (maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);     } } 
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package  cn.itcast.server;import  cn.itcast.protocol.MessageCodecSharable;import  cn.itcast.protocol.ProtocolFrameDecoder;import  cn.itcast.server.handler.RpcRequestMessageHandler;import  io.netty.bootstrap.ServerBootstrap;import  io.netty.channel.Channel;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.nio.NioEventLoopGroup;import  io.netty.channel.socket.SocketChannel;import  io.netty.channel.socket.nio.NioServerSocketChannel;import  io.netty.handler.logging.LogLevel;import  io.netty.handler.logging.LoggingHandler;import  lombok.extern.slf4j.Slf4j;@Slf4j public  class  RpcServer  {    public  static  void  main (String[] args)  {         NioEventLoopGroup  boss  =  new  NioEventLoopGroup ();         NioEventLoopGroup  worker  =  new  NioEventLoopGroup ();                  LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);                  MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();                  RpcRequestMessageHandler  RPC_HANDLER  =  new  RpcRequestMessageHandler ();         try  {             ServerBootstrap  serverBootstrap  =  new  ServerBootstrap ();             serverBootstrap.channel(NioServerSocketChannel.class);             serverBootstrap.group(boss, worker);             serverBootstrap.childHandler(new  ChannelInitializer <SocketChannel>() {                 @Override                  protected  void  initChannel (SocketChannel ch)  throws  Exception {                                          ch.pipeline().addLast(new  ProtocolFrameDecoder ());                     ch.pipeline().addLast(LOGGING_HANDLER);                     ch.pipeline().addLast(MESSAGE_CODEC);                     ch.pipeline().addLast(RPC_HANDLER);                 }             });             Channel  channel  =  serverBootstrap.bind(8080 ).sync().channel();             channel.closeFuture().sync();         } catch  (InterruptedException e) {             log.error("server error" , e);         } finally  {             boss.shutdownGracefully();             worker.shutdownGracefully();         }     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package  cn.itcast.server.handler;import  cn.itcast.message.RpcRequestMessage;import  cn.itcast.message.RpcResponseMessage;import  cn.itcast.server.service.ServicesFactory;import  io.netty.channel.ChannelHandlerContext;import  io.netty.channel.ChannelInboundHandlerAdapter;import  io.netty.channel.SimpleChannelInboundHandler;import  java.lang.reflect.InvocationTargetException;import  java.lang.reflect.Method;public  class  RpcRequestMessageHandler  extends  SimpleChannelInboundHandler <RpcRequestMessage> {    @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage msg)  throws  Exception {         String  interfaceName  =  msg.getInterfaceName();         String  methodName  =  msg.getMethodName();         int  sequenceId  =  msg.getSequenceId();         Object[] parameterValue = msg.getParameterValue();         Class[] parameterTypes = msg.getParameterTypes();         RpcResponseMessage  response  =  new  RpcResponseMessage ();         response.setSequenceId(sequenceId);                  try  {             Object  service  =  ServicesFactory.getService(Class.forName(interfaceName));             Method  method  =  service.getClass().getMethod(methodName,parameterTypes);             Object  invoke  =  method.invoke(service, parameterValue);             response.setReturnValue(invoke);         } catch  (Exception e) {             e.printStackTrace();             response.setExceptionValue(new  Exception ("error:" +e.getCause().getMessage()));         }         ctx.writeAndFlush(response);     } } 
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package  cn.itcast.client;import  cn.itcast.client.handler.RpcResponseMessageHandler;import  cn.itcast.message.RpcRequestMessage;import  cn.itcast.protocol.MessageCodecSharable;import  cn.itcast.protocol.ProtocolFrameDecoder;import  cn.itcast.protocol.SequenceIdGenerator;import  cn.itcast.server.service.HelloService;import  io.netty.bootstrap.Bootstrap;import  io.netty.channel.Channel;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.nio.NioEventLoopGroup;import  io.netty.channel.socket.SocketChannel;import  io.netty.channel.socket.nio.NioSocketChannel;import  io.netty.handler.logging.LogLevel;import  io.netty.handler.logging.LoggingHandler;import  io.netty.util.concurrent.DefaultPromise;import  java.lang.reflect.Proxy;public  class  RpcClientManager  {    public  static  void  main (String[] args)  {         HelloService  proxyService  =  getProxyService(HelloService.class);         System.out.println(proxyService.sayHello("zhangsan" ));     }     public  static  <T> T getProxyService (Class<T> serviceClass)  {         ClassLoader  loader  =  serviceClass.getClassLoader();         Class<?>[] interfaces = new  Class []{serviceClass};         Object  o  =  Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {             int  sequenceId  =  SequenceIdGenerator.nextId();             RpcRequestMessage  message  =  new  RpcRequestMessage (                     sequenceId,                     serviceClass.getName(),                     method.getName(),                     method.getReturnType(),                     method.getParameterTypes(),                     args             );             getChannel().writeAndFlush(message);             DefaultPromise<Object> promise = new  DefaultPromise <>(getChannel().eventLoop());             RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);             promise.await();             if  (promise.isSuccess()) {                 return  promise.getNow();             } else  {                 throw  new  RuntimeException (promise.cause());             }         });         return  (T) o;     }     private  static  Channel  channel  =  null ;     private  static  final  Object  LOCK  =  new  Object ();     public  static  Channel getChannel ()  {         if  (channel != null ) {             return  channel;         }         synchronized  (LOCK) {             if  (channel != null ) {                 return  channel;             }             initChannel();             return  channel;         }     }     private  static  void  initChannel ()  {         NioEventLoopGroup  group  =  new  NioEventLoopGroup ();         LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);         MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();         RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();         Bootstrap  bootstrap  =  new  Bootstrap ();         bootstrap.channel(NioSocketChannel.class);         bootstrap.group(group);         bootstrap.handler(new  ChannelInitializer <SocketChannel>() {             @Override              protected  void  initChannel (SocketChannel ch)  throws  Exception {                 ch.pipeline().addLast(new  ProtocolFrameDecoder ());                 ch.pipeline().addLast(LOGGING_HANDLER);                 ch.pipeline().addLast(MESSAGE_CODEC);                 ch.pipeline().addLast(RPC_HANDLER);             }         });         try  {             channel = bootstrap.connect("localhost" , 8080 ).sync().channel();             channel.closeFuture().addListener(future -> {                 group.shutdownGracefully();             });         } catch  (InterruptedException e) {             e.printStackTrace();         }     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package  cn.itcast.client.handler;import  cn.itcast.message.RpcResponseMessage;import  io.netty.channel.ChannelHandler;import  io.netty.channel.ChannelHandlerContext;import  io.netty.channel.SimpleChannelInboundHandler;import  io.netty.util.concurrent.Promise;import  lombok.extern.slf4j.Slf4j;import  java.util.Map;import  java.util.concurrent.ConcurrentHashMap;@Slf4j @ChannelHandler .Sharablepublic  class  RpcResponseMessageHandler  extends  SimpleChannelInboundHandler <RpcResponseMessage> {         public  static  final  Map<Integer, Promise<Object>> PROMISES = new  ConcurrentHashMap <>();     @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg)  throws  Exception {         log.debug("{}" , msg);         int  sequenceId  =  msg.getSequenceId();         Object  returnValue  =  msg.getReturnValue();         Exception  exceptionValue  =  msg.getExceptionValue();         Promise<Object> promise = PROMISES.remove(sequenceId);         if (promise != null ){             if (exceptionValue != null ){                 promise.setFailure(exceptionValue);             }else  {                 promise.setSuccess(returnValue);             }         }     } } 
消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package  cn.itcast.message;import  lombok.Data;import  java.io.Serializable;import  java.util.HashMap;import  java.util.Map;@Data public  abstract  class  Message  implements  Serializable  {    public  static  Class<?> getMessageClass(int  messageType) {         return  messageClasses.get(messageType);     }     private  int  sequenceId;     private  int  messageType;     public  abstract  int  getMessageType () ;          public  static  final  int  RPC_MESSAGE_TYPE_REQUEST  =  101 ;          public  static  final  int   RPC_MESSAGE_TYPE_RESPONSE  =  102 ;     private  static  final  Map<Integer, Class<?>> messageClasses = new  HashMap <>();     static  {         messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);         messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package  cn.itcast.message;import  lombok.Getter;import  lombok.ToString;@Getter @ToString(callSuper = true) public  class  RpcRequestMessage  extends  Message  {         private  String interfaceName;          private  String methodName;          private  Class<?> returnType;          private  Class[] parameterTypes;          private  Object[] parameterValue;     public  RpcRequestMessage (int  sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue)  {         super .setSequenceId(sequenceId);         this .interfaceName = interfaceName;         this .methodName = methodName;         this .returnType = returnType;         this .parameterTypes = parameterTypes;         this .parameterValue = parameterValue;     }     @Override      public  int  getMessageType ()  {         return  RPC_MESSAGE_TYPE_REQUEST;     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package  cn.itcast.message;import  lombok.Data;import  lombok.ToString;@Data @ToString(callSuper = true) public  class  RpcResponseMessage  extends  Message  {         private  Object returnValue;          private  Exception exceptionValue;     @Override      public  int  getMessageType ()  {         return  RPC_MESSAGE_TYPE_RESPONSE;     } }