Netty网络编程自用资料 | 字数总计: 2.3k | 阅读时长: 12分钟 | 阅读量:
gitee:https://gitee.com/niumazlb/mynettydemo github:https://github.com/ishuaige/mynettydemo
服务端 这里用 netty+WebSocket 举例 首先我们需要一个服务端 WebSocketServer 创建服务端大致步骤:
创建 ServerBootstrap - bootstrap
bootstrap 关联两个事件循环组(EventLoopGroup) boss 和 work
bootstrap 配置项 option
bootstrap 绑定 channel
bootstrap 绑定 handler,这里可以做一个封装
bootstrap 绑定 绑定端口
bootstrap 绑定监听关闭连接事件,处理 boss 和 work 的关闭
以上 2-5 顺序随意
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import com.niuma.mynetty.server.handler.MyWebSocketChannelHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.springframework.stereotype.Component;import javax.annotation.Resource;@ChannelHandler .Sharable @Component public class WebSocketServer { EventLoopGroup boss = new NioEventLoopGroup (); EventLoopGroup work = new NioEventLoopGroup (); @Resource MyWebSocketChannelHandler myWebSocketChannelHandler; public void run () { try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(boss, work); bootstrap.option(ChannelOption.SO_BACKLOG, 1024 ); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(myWebSocketChannelHandler); Channel channel = bootstrap.bind(8888 ).sync().channel(); channel.closeFuture().addListener(future -> { boss.shutdownGracefully(); work.shutdownGracefully(); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
注意 Handler 的继承喔
这里的 Handler 是处理的 websocket 的,其他需要具体情况具体分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package com.niuma.mynetty.server.handler;import com.niuma.mynetty.config.NettyConfig;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component public class MyWebSocketChannelHandler extends ChannelInitializer <SocketChannel> { @Resource MyWebSocketHandler myWebSocketHandler ; @Resource RegisterHandler registerHandler ; @Resource SingleMessageHandler singleMessageHandler ; @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec" , new HttpServerCodec ()) .addLast("log" ,new LoggingHandler (LogLevel.DEBUG)) .addLast("aggregator" ,new HttpObjectAggregator (65536 )) .addLast("http-chunked" ,new ChunkedWriteHandler ()) .addLast("protocolHandler" ,new WebSocketServerProtocolHandler ("/websocket" )) .addLast(myWebSocketHandler) .addLast(registerHandler) .addLast(singleMessageHandler); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { NettyConfig.group.add(ctx.channel()); System.out.println("客户端与服务端连接开启...." ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { NettyConfig.group.remove(ctx.channel()); System.out.println("客户端与服务端连接关闭...." ); } @Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
简单的 RPC 框架 参考黑马程序员 netty 课程
协议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package cn.itcast.protocol;import cn.itcast.config.Config;import cn.itcast.message.Message;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageCodec;import lombok.extern.slf4j.Slf4j;import java.util.List;@Slf4j @ChannelHandler .Sharablepublic class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override public void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(Config.getSerializerAlgorithm().ordinal()); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); byte [] bytes = Config.getSerializerAlgorithm().serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; Class<? extends Message > messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); out.add(message); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package cn.itcast.protocol;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class ProtocolFrameDecoder extends LengthFieldBasedFrameDecoder { public ProtocolFrameDecoder () { this (1024 , 12 , 4 , 0 , 0 ); } public ProtocolFrameDecoder (int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super (maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); } }
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package cn.itcast.server;import cn.itcast.protocol.MessageCodecSharable;import cn.itcast.protocol.ProtocolFrameDecoder;import cn.itcast.server.handler.RpcRequestMessageHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;@Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package cn.itcast.server.handler;import cn.itcast.message.RpcRequestMessage;import cn.itcast.message.RpcResponseMessage;import cn.itcast.server.service.ServicesFactory;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.SimpleChannelInboundHandler;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;public class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception { String interfaceName = msg.getInterfaceName(); String methodName = msg.getMethodName(); int sequenceId = msg.getSequenceId(); Object[] parameterValue = msg.getParameterValue(); Class[] parameterTypes = msg.getParameterTypes(); RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(sequenceId); try { Object service = ServicesFactory.getService(Class.forName(interfaceName)); Method method = service.getClass().getMethod(methodName,parameterTypes); Object invoke = method.invoke(service, parameterValue); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(new Exception ("error:" +e.getCause().getMessage())); } ctx.writeAndFlush(response); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package cn.itcast.client;import cn.itcast.client.handler.RpcResponseMessageHandler;import cn.itcast.message.RpcRequestMessage;import cn.itcast.protocol.MessageCodecSharable;import cn.itcast.protocol.ProtocolFrameDecoder;import cn.itcast.protocol.SequenceIdGenerator;import cn.itcast.server.service.HelloService;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.util.concurrent.DefaultPromise;import java.lang.reflect.Proxy;public class RpcClientManager { public static void main (String[] args) { HelloService proxyService = getProxyService(HelloService.class); System.out.println(proxyService.sayHello("zhangsan" )); } public static <T> T getProxyService (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage message = new RpcRequestMessage ( sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(message); DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } }); return (T) o; } private static Channel channel = null ; private static final Object LOCK = new Object (); public static Channel getChannel () { if (channel != null ) { return channel; } synchronized (LOCK) { if (channel != null ) { return channel; } initChannel(); return channel; } } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package cn.itcast.client.handler;import cn.itcast.message.RpcResponseMessage;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.concurrent.Promise;import lombok.extern.slf4j.Slf4j;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap <>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); int sequenceId = msg.getSequenceId(); Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); Promise<Object> promise = PROMISES.remove(sequenceId); if (promise != null ){ if (exceptionValue != null ){ promise.setFailure(exceptionValue); }else { promise.setSuccess(returnValue); } } } }
消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package cn.itcast.message;import lombok.Data;import java.io.Serializable;import java.util.HashMap;import java.util.Map;@Data public abstract class Message implements Serializable { public static Class<?> getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType () ; public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; private static final Map<Integer, Class<?>> messageClasses = new HashMap <>(); static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package cn.itcast.message;import lombok.Getter;import lombok.ToString;@Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package cn.itcast.message;import lombok.Data;import lombok.ToString;@Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } }