我的诉求是,我现在kafka上面有个实时是数据流,他是一个接口的请求日志。
我现在想要实时分析出来每个接口的访问频次和延迟,我还想按照系统的维度统计访问频次和延迟。
就是说我想对同一个数据源进行初始过滤,然后对数据流进行分支,然后分别计算我想要的数据。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"logdata",
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(consumer);
DataStream<YourRecordType> parsedStream = stream
.map(new YourJsonDeserializer());
DataStream<YourRecordType> filteredStream = parsedStream
.filter(record -> "production".equals(record.getEnv()) && record.isProviderSide());
// 统一过滤
// 分支一:按 interfaceId + methodName 统计
filteredStream
.keyBy(record -> record.getInterfaceId() + record.getMethodName())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new YourCountAggregateFunction())
.addSink(new YourOutputSink());
// 分支二:按 appName 统计
filteredStream
.keyBy(record -> record.getAppName())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new YourAppAggregateFunction())
.addSink(new YourOutputSink());
env.execute("Kafka Flink Integration");