Press "Enter" to skip to content

使用idea配置一个maven的flink项目消费kafka数据进行流式处理

创建项目

idea新建项目,Archetype选择:org.apache.maven.archetypes:maven-archetype-quickstart

项目创建好之后,pom依赖补充一些flink相关的,日志相关的和打包相关的依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.xiaocaicai.fs.flink</groupId>
  <artifactId>devour-flink</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>devour-flink</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.18.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.18.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>1.18.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.18.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.16.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>2.0.11</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>2.0.11</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>4.2.3</version>
    </dependency>
    <!-- JSON 解析依赖 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.16.1</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version> <!-- 请使用适合的版本 -->
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <!-- Do not copy the signatures in the META-INF folder.
                  Otherwise, this might cause SecurityExceptions when using the JAR. -->
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <!-- Replace this with the main class of your job -->
                  <mainClass>my.programs.main.clazz</mainClass>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
package com.xiaocaicai.fs.flink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaFlinkIntegration {

    static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        // 设置 Kafka 连接属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "devour-flink");

        // 创建 Flink 流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Kafka Source
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
                "apilog",
                new SimpleStringSchema(),
                properties);

        // 数据流处理
        DataStreamSink<LogRecord> stream = env
                .addSource(kafkaSource)
                .map(new MapFunction<String, LogRecord>() { // 解析 JSON
                    @Override
                    public LogRecord map(String value) throws Exception {
                        return parseJson(value);
                    }
                })
                .filter(new FilterFunction<LogRecord>() { // 过滤条件
                    @Override
                    public boolean filter(LogRecord record) throws Exception {
                        if(record == null){
                            return false;
                        }
                        return "production".equals(record.env) && record.providerSide;
                    }
                })
                .keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // 窗口大小设置为一分钟
                .sum("count") // 聚合函数计数
                .print(); // 输出结果

        // 启动 Flink 作业
        env.execute("Kafka Flink Integration");
    }

    private static LogRecord parseJson(String json) {
        // 实现 JSON 解析逻辑,将 JSON 字符串转换为 LogRecord 对象
        // 例如,使用 Jackson 或其他 JSON 解析库
        try {
            return objectMapper.readValue(json, LogRecord.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }

}
package com.xiaocaicai.fs.flink;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class LogRecord {
    public String interfaceId;
    public String methodName;
    public String env;
    public boolean providerSide;
    public int count = 1; // 每个记录计数为 1

    @Override
    public String toString() {
        return "LogRecord{" +
                "interfaceId='" + interfaceId + '\'' +
                ", methodName='" + methodName + '\'' +
                '}';
    }

    // 其他需要的字段...
}

这里是个简单聚合统计场景,按照接口统计每分钟的调用频次。

用maven进行项目打包之后会出来个几十兆的大的jar包,帮他上传到flink集群的job里面,然后点开始任务,然后帮主函数的报名.类名填进去就可以开始任务了。

任务现在的结果是print到日志里面的,也可以写一个RedisSink来接结果:

package com.xiaocaicai.fs.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class RedisSink extends RichSinkFunction<LogRecord> {
    private transient Jedis jedis;

    @Override
    public void open(Configuration config) {
        this.jedis = new Jedis("localhost", 6379);
        this.jedis.auth("redispass");
    }

    @Override
    public void invoke(LogRecord value, Context context) {
        jedis.set("resultKey", value.toString()); // 以您的格式存储数据
    }

    @Override
    public void close() {
        jedis.close();
    }
}

帮print改成 addSink(new RedisSink());

                .keyBy(record -> record.interfaceId + record.methodName) // KeyBy 操作
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // 窗口大小设置为一分钟
                .sum("count") // 聚合函数计数
                .addSink(new RedisSink());
                //.print(); // 输出结果

参考资料:

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/configuration/maven/

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注