尝试用netty写了个日志收集服务,类似于logstash的功能,因为我们才采集日志的时候可能有很多的策略要去做,logstash的功能不够支撑,所以尝试使用netty自己写。
看了一些netty的教程,大概是以下几个步骤
- new两个 NioEventLoopGroup
一个是bossGroup,一个是workerGroup
- new一个ServerBootstrap
搞出来的对象暂且叫b
- b.group(bossGroup,workerGroup)
帮这两个group搞里头
- b.channel(NioServerSocketChannel.class)
指定下我们server所使用的channel模型
- b.childHandler
给我们的server配置handler
- 一些其他配置
完整代码如下:
public class NettyServer { private int port; private final MessageHandler messageHandler; public NettyServer(int port, MessageHandler messageHandler) { this.messageHandler = messageHandler; this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast( new ProcessingHandler(messageHandler)); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); log.info("starting netty server " + Thread.currentThread().getName() + " on port " + port); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
然后我们只要启动这个server就行了:
(new Thread(() -> { while (true) { try { new NettyServer(5000, new LogstashHandler(kafkaProducer, config)).run(); } catch (Exception e) { e.printStackTrace(); } } }, "logstash-server")).start();
你可能注意到上面有个 ProcessingHandler,这个是数据处理的核心代码:
public class ProcessingHandler extends ChannelInboundHandlerAdapter { private StringBuffer sBuffer = new StringBuffer(); private static final String NEW_LINE = "\n"; private static final String HOST_KEY = "host"; private final MessageHandler messageHandler; ObjectMapper mapper = new ObjectMapper(); public ProcessingHandler(MessageHandler messageHandler) { this.messageHandler = messageHandler; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; final String s = in.toString(CharsetUtil.UTF_8); try{ if(s.endsWith(NEW_LINE)) { String hostAddress = ""; if(ctx.channel().remoteAddress() instanceof InetSocketAddress){ hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); } if(sBuffer.length() == 0) { onMessage(s, hostAddress); }else { sBuffer.append(s); onMessage(sBuffer.toString(), hostAddress); sBuffer.delete(0, sBuffer.length()); } }else { sBuffer.append(s); } }catch(Exception e){ e.printStackTrace(); log.error(e.getMessage()); }finally{ in.release(); } } @SuppressWarnings("unchecked") private void onMessage(String message, String host) { String[] messages = message.split(NEW_LINE); for (String s : messages) { try { HashMap<String, String> hashMap = mapper.readValue(s, HashMap.class); hashMap.put(HOST_KEY, host); messageHandler.onMessage(hashMap); } catch (Exception e) { e.printStackTrace(); log.error("data:" + message + " error:" + e.getMessage()); } } } }
我们自己写个ProcessingHandler 来继承ChannelInboundHandlerAdapter,在他的channelRead里面就能读取到对端数据了。
起初我在写的时候,发现channelRead里面读取的数据可能是残的,就是说如果有大的包,logback会拆包发给服务端。
那我就根据行标识来认定数据是一条完整数据。
logback可能还会合并包,所以我在我自己写的onMessage里面还要再拆包。
大概逻辑没啥问题
我第一个版本写的代码是在 NettyServer的构造函数里面写的 new ProcessingHandler(messageHandler) 然后直接挂到NettyServer属性上面,然后在他的 ChannelInitializer里面直接这样写了
ch.pipeline().addLast(processingHandler)
项目启动,有客户端连上来的时候,控制台会报错:
Failed to initialize a channel. Closing: [id: 0xe648b9c0, L:/127.0.0.1:5000 – R:/127.0.0.1:45748]
io.netty.channel.ChannelPipelineException: com.logdevour.devour.netty.handler.ProcessingHandler is not a @Sharable handler, so can’t be added or removed multiple times.
然后我在网上找了些资料,说是要在我的ProcessingHandler上面加个注解@Sharable,然后我就加了这个注解,确实不报错了。
项目跑了一段时间,我发现数据错乱,A客户端的数据会跑到B客户端上面来,导致解析也各种报错。
然后我又仔细研究了一下,因为我前面提到我的客户端在传数据的时候是分段的,然后我在ProcessingHandler里面定义了一个StringBuffer来暂存数据的,如果我还用单例来传给pipeline的话,相当于每个客户端都是在共享这个对象了,难怪他们的数据会混到一起了。
这也是官方为什么要限制你必须要显示的配置@Sharable注解的原因了,你应该真的确定你的这个handler是可以share的,不然的话,必须要在ChannelInitializer的时候每次传入不同的对象实例避免他们数据干扰。具体的写法就是我最前面贴的那样,你要在ChannelInitializer里面去new他就行了。
结论
@Sharable注解不要乱用,你的handler真的是可以share的才可以加,不然的话,需要你每次在初始化会话的时候new新的handler进去。你项目的具体情况你自己需要判断(是否有数据共享),而不是向网上大多数答案那样单纯的加个注解来解决。
参考资料: