创建项目
idea新建项目,Archetype选择:org.apache.maven.archetypes:maven-archetype-quickstart
项目创建好之后,pom依赖补充一些flink相关的,日志相关的和打包相关的依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaocaicai.fs.flink</groupId>
<artifactId>devour-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>devour-flink</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.18.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.11</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.2.3</version>
</dependency>
<!-- JSON 解析依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <!-- 请使用适合的版本 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.xiaocaicai.fs.flink;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkIntegration {
static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
// 设置 Kafka 连接属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "devour-flink");
// 创建 Flink 流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Kafka Source
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"apilog",
new SimpleStringSchema(),
properties);
// 数据流处理
DataStreamSink<LogRecord> stream = env
.addSource(kafkaSource)
.map(new MapFunction<String, LogRecord>() { // 解析 JSON
@Override
public LogRecord map(String value) throws Exception {
return parseJson(value);
}
})
.filter(new FilterFunction<LogRecord>() { // 过滤条件
@Override
public boolean filter(LogRecord record) throws Exception {
if(record == null){
return false;
}
return "production".equals(record.env) && record.providerSide;
}
})
.keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // 窗口大小设置为一分钟
.sum("count") // 聚合函数计数
.print(); // 输出结果
// 启动 Flink 作业
env.execute("Kafka Flink Integration");
}
private static LogRecord parseJson(String json) {
// 实现 JSON 解析逻辑,将 JSON 字符串转换为 LogRecord 对象
// 例如,使用 Jackson 或其他 JSON 解析库
try {
return objectMapper.readValue(json, LogRecord.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}
package com.xiaocaicai.fs.flink;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class LogRecord {
public String interfaceId;
public String methodName;
public String env;
public boolean providerSide;
public int count = 1; // 每个记录计数为 1
@Override
public String toString() {
return "LogRecord{" +
"interfaceId='" + interfaceId + '\'' +
", methodName='" + methodName + '\'' +
'}';
}
// 其他需要的字段...
}
这里是个简单聚合统计场景,按照接口统计每分钟的调用频次。
用maven进行项目打包之后会出来个几十兆的大的jar包,帮他上传到flink集群的job里面,然后点开始任务,然后帮主函数的报名.类名填进去就可以开始任务了。
任务现在的结果是print到日志里面的,也可以写一个RedisSink来接结果:
package com.xiaocaicai.fs.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class RedisSink extends RichSinkFunction<LogRecord> {
private transient Jedis jedis;
@Override
public void open(Configuration config) {
this.jedis = new Jedis("localhost", 6379);
this.jedis.auth("redispass");
}
@Override
public void invoke(LogRecord value, Context context) {
jedis.set("resultKey", value.toString()); // 以您的格式存储数据
}
@Override
public void close() {
jedis.close();
}
}
帮print改成 addSink(new RedisSink());
.keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // 窗口大小设置为一分钟
.sum("count") // 聚合函数计数
.addSink(new RedisSink());
//.print(); // 输出结果
参考资料:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/configuration/maven/