尝试用netty写了个日志收集服务,类似于logstash的功能,因为我们才采集日志的时候可能有很多的策略要去做,logstash的功能不够支撑,所以尝试使用netty自己写。
看了一些netty的教程,大概是以下几个步骤
一个是bossGroup,一个是workerGroup
搞出来的对象暂且叫b
- b.group(bossGroup,workerGroup)
帮这两个group搞里头
- b.channel(NioServerSocketChannel.class)
指定下我们server所使用的channel模型
给我们的server配置handler
完整代码如下:
public class NettyServer {
private final MessageHandler messageHandler;
public NettyServer(int port, MessageHandler messageHandler) {
this.messageHandler = messageHandler;
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
new ProcessingHandler(messageHandler));
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
log.info("starting netty server " + Thread.currentThread().getName() + " on port " + port);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
public class NettyServer {
private int port;
private final MessageHandler messageHandler;
public NettyServer(int port, MessageHandler messageHandler) {
this.messageHandler = messageHandler;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ProcessingHandler(messageHandler));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
log.info("starting netty server " + Thread.currentThread().getName() + " on port " + port);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class NettyServer {
private int port;
private final MessageHandler messageHandler;
public NettyServer(int port, MessageHandler messageHandler) {
this.messageHandler = messageHandler;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ProcessingHandler(messageHandler));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
log.info("starting netty server " + Thread.currentThread().getName() + " on port " + port);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
然后我们只要启动这个server就行了:
new NettyServer(5000, new LogstashHandler(kafkaProducer, config)).run();
}, "logstash-server")).start();
(new Thread(() -> {
while (true) {
try {
new NettyServer(5000, new LogstashHandler(kafkaProducer, config)).run();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "logstash-server")).start();
(new Thread(() -> {
while (true) {
try {
new NettyServer(5000, new LogstashHandler(kafkaProducer, config)).run();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "logstash-server")).start();
你可能注意到上面有个 ProcessingHandler,这个是数据处理的核心代码:
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
private StringBuffer sBuffer = new StringBuffer();
private static final String NEW_LINE = "\n";
private static final String HOST_KEY = "host";
private final MessageHandler messageHandler;
ObjectMapper mapper = new ObjectMapper();
public ProcessingHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
final String s = in.toString(CharsetUtil.UTF_8);
if(s.endsWith(NEW_LINE)) {
if(ctx.channel().remoteAddress() instanceof InetSocketAddress){
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if(sBuffer.length() == 0) {
onMessage(s, hostAddress);
onMessage(sBuffer.toString(), hostAddress);
sBuffer.delete(0, sBuffer.length());
log.error(e.getMessage());
@SuppressWarnings("unchecked")
private void onMessage(String message, String host) {
String[] messages = message.split(NEW_LINE);
for (String s : messages) {
HashMap<String, String> hashMap = mapper.readValue(s, HashMap.class);
hashMap.put(HOST_KEY, host);
messageHandler.onMessage(hashMap);
log.error("data:" + message + " error:" + e.getMessage());
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
private StringBuffer sBuffer = new StringBuffer();
private static final String NEW_LINE = "\n";
private static final String HOST_KEY = "host";
private final MessageHandler messageHandler;
ObjectMapper mapper = new ObjectMapper();
public ProcessingHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
final String s = in.toString(CharsetUtil.UTF_8);
try{
if(s.endsWith(NEW_LINE)) {
String hostAddress = "";
if(ctx.channel().remoteAddress() instanceof InetSocketAddress){
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
}
if(sBuffer.length() == 0) {
onMessage(s, hostAddress);
}else {
sBuffer.append(s);
onMessage(sBuffer.toString(), hostAddress);
sBuffer.delete(0, sBuffer.length());
}
}else {
sBuffer.append(s);
}
}catch(Exception e){
e.printStackTrace();
log.error(e.getMessage());
}finally{
in.release();
}
}
@SuppressWarnings("unchecked")
private void onMessage(String message, String host) {
String[] messages = message.split(NEW_LINE);
for (String s : messages) {
try {
HashMap<String, String> hashMap = mapper.readValue(s, HashMap.class);
hashMap.put(HOST_KEY, host);
messageHandler.onMessage(hashMap);
} catch (Exception e) {
e.printStackTrace();
log.error("data:" + message + " error:" + e.getMessage());
}
}
}
}
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
private StringBuffer sBuffer = new StringBuffer();
private static final String NEW_LINE = "\n";
private static final String HOST_KEY = "host";
private final MessageHandler messageHandler;
ObjectMapper mapper = new ObjectMapper();
public ProcessingHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
final String s = in.toString(CharsetUtil.UTF_8);
try{
if(s.endsWith(NEW_LINE)) {
String hostAddress = "";
if(ctx.channel().remoteAddress() instanceof InetSocketAddress){
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
}
if(sBuffer.length() == 0) {
onMessage(s, hostAddress);
}else {
sBuffer.append(s);
onMessage(sBuffer.toString(), hostAddress);
sBuffer.delete(0, sBuffer.length());
}
}else {
sBuffer.append(s);
}
}catch(Exception e){
e.printStackTrace();
log.error(e.getMessage());
}finally{
in.release();
}
}
@SuppressWarnings("unchecked")
private void onMessage(String message, String host) {
String[] messages = message.split(NEW_LINE);
for (String s : messages) {
try {
HashMap<String, String> hashMap = mapper.readValue(s, HashMap.class);
hashMap.put(HOST_KEY, host);
messageHandler.onMessage(hashMap);
} catch (Exception e) {
e.printStackTrace();
log.error("data:" + message + " error:" + e.getMessage());
}
}
}
}
我们自己写个ProcessingHandler 来继承ChannelInboundHandlerAdapter,在他的channelRead里面就能读取到对端数据了。
起初我在写的时候,发现channelRead里面读取的数据可能是残的,就是说如果有大的包,logback会拆包发给服务端。
那我就根据行标识来认定数据是一条完整数据。
logback可能还会合并包,所以我在我自己写的onMessage里面还要再拆包。
大概逻辑没啥问题
我第一个版本写的代码是在 NettyServer的构造函数里面写的 new ProcessingHandler(messageHandler) 然后直接挂到NettyServer属性上面,然后在他的 ChannelInitializer里面直接这样写了
ch.pipeline().addLast(processingHandler)
项目启动,有客户端连上来的时候,控制台会报错:
Failed to initialize a channel. Closing: [id: 0xe648b9c0, L:/127.0.0.1:5000 – R:/127.0.0.1:45748]
io.netty.channel.ChannelPipelineException: com.logdevour.devour.netty.handler.ProcessingHandler is not a @Sharable handler, so can’t be added or removed multiple times.
然后我在网上找了些资料,说是要在我的ProcessingHandler上面加个注解@Sharable,然后我就加了这个注解,确实不报错了。
项目跑了一段时间,我发现数据错乱,A客户端的数据会跑到B客户端上面来,导致解析也各种报错。
然后我又仔细研究了一下,因为我前面提到我的客户端在传数据的时候是分段的,然后我在ProcessingHandler里面定义了一个StringBuffer来暂存数据的,如果我还用单例来传给pipeline的话,相当于每个客户端都是在共享这个对象了,难怪他们的数据会混到一起了。
这也是官方为什么要限制你必须要显示的配置@Sharable注解的原因了,你应该真的确定你的这个handler是可以share的,不然的话,必须要在ChannelInitializer的时候每次传入不同的对象实例避免他们数据干扰。具体的写法就是我最前面贴的那样,你要在ChannelInitializer里面去new他就行了。
结论
@Sharable注解不要乱用,你的handler真的是可以share的才可以加,不然的话,需要你每次在初始化会话的时候new新的handler进去。你项目的具体情况你自己需要判断(是否有数据共享),而不是向网上大多数答案那样单纯的加个注解来解决。
参考资料:
https://stackoverflow.com/questions/25714860/adding-netty-handler-after-channel-initialization-netty-4-0-17-final