package com.xiaocaicai.fs.flink;
import com.xiaocaicai.fs.flink.utils.GSON;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Calendar;
import java.util.Properties;
import java.util.TimeZone;
public class KafkaFlinkIntegration {
public static void main(String[] args) throws Exception {
// 设置 Kafka 连接属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.200:9092");
properties.setProperty("group.id", "devour-flink");
// 创建 Flink 流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Kafka Source
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"apiLog",
new SimpleStringSchema(),
properties);
// 数据流处理
DataStream<LogRecord> stream = env
.addSource(kafkaSource)
.map(new MapFunction<String, LogRecord>() { // 解析 JSON
@Override
public LogRecord map(String value) throws Exception {
return parseJson(value);
}
})
.filter(new FilterFunction<LogRecord>() { // 过滤条件
@Override
public boolean filter(LogRecord record) throws Exception {
if(record == null){
return false;
}
return "production".equals(record.env) && record.providerSide;
}
});
stream.keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
.window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(5))) // 窗口大小设置为一分钟
.sum("count") // 聚合函数计数
.addSink(new SinkFunction<LogRecord>() {
@Override
public void invoke(LogRecord value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
System.out.println(value.appName + " ----> " + value.interfaceId + " ----> " + value.count);
}
}); // 输出结果
stream.keyBy(record -> record.appName) // KeyBy 操作
.window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.minutes(1)))
.aggregate(
new AggregateFunction<LogRecord, Tuple2<Long, Integer>, Tuple2<Double, Integer>>() {
@Override
public Tuple2<Long, Integer> createAccumulator() {
return new Tuple2<>(0L, 0);
}
@Override
public Tuple2<Long, Integer> add(LogRecord value, Tuple2<Long, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value.elapsedTime, accumulator.f1 + 1);
}
@Override
public Tuple2<Double, Integer> getResult(Tuple2<Long, Integer> accumulator) {
return new Tuple2<>(accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1, accumulator.f1);
}
@Override
public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
},
new ProcessWindowFunction<Tuple2<Double, Integer>, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<Double, Integer>> averages, Collector<String> out) {
Tuple2<Double, Integer> average = averages.iterator().next();
out.collect("AppName: " + key + " Count: " + average.f1 + " Average: " + average.f0);
}
}
)
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
System.out.println("应用统计及延迟:" + value);
}
});
stream
.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.minutes(1)))
.apply(new AllWindowFunction<LogRecord, String, TimeWindow>(){
private long count = 0;
@Override
public void apply(TimeWindow window, Iterable<LogRecord> values, Collector<String> out) throws Exception {
if (isMidnight(window.getEnd())) {
count = 0;
// 每天0点重置计数器
}
for (LogRecord value : values) {
count++;
}
out.collect("ALL DAY COUNT: " + count);
}
private boolean isMidnight(long timestamp) {
// 创建日历实例
Calendar calendar = Calendar.getInstance();
// 设置时区,这里假设使用系统默认时区,根据需要调整
calendar.setTimeZone(TimeZone.getDefault());
// 将时间戳设置到日历实例中
calendar.setTimeInMillis(timestamp);
// 获取小时和分钟
int hour = calendar.get(Calendar.HOUR_OF_DAY);
int minute = calendar.get(Calendar.MINUTE);
return hour == 0 && minute == 0;
}
})
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
System.out.println("全量统计:" + value);
}
});
// 启动 Flink 作业
env.execute("Kafka Flink Integration");
}
private static LogRecord parseJson(String json) {
// 实现 JSON 解析逻辑,将 JSON 字符串转换为 LogRecord 对象
// 例如,使用 Jackson 或其他 JSON 解析库
try {
return GSON.fromJson(json, LogRecord.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
用例介绍:
从Kafka消费的一些接口日志,先对数据进行了过滤,然后将数据进行了3个维度的分析,分别是:
1.按照接口的维度统计调用量
2.按照应用的维度统计调用量和平均延迟
3.按照自然日来统计调用量