跳到主要内容

flink 动手入门

flin官方文档的walkthrough非常给力, 仔细看完github的demo代码, 跟着文档动手操作一遍, 许多flink基础概念就有了了解, 领进门了.

对于flink这种热门组件, 网上到处都是tutorial, 眼花缭乱. 其实官方的demo就是最好的入门.

在官方tutorial中学会了

  • 如何启动一个flink session group, 附带kafka, mysql, es等常用搭配组件
  • kafka直接持续入库mysql的case
  • kafka直接持续入库es的case
  • window + watermark的概念和用法
  • flink的etl概念
    • flink对source和sink都需要注册table, 相当于etl里的extract和load
    • finksql作为etl中的transfer, 对数据进行变更
  • 持续生成数据的简单写法, throttle限速
  • backpressure的测试方法, map里进行sleep

测试完了有一个感慨, 在用flink流式处理之后, 还是觉得不太放心, 想用离线处理再补充一遍, 至少也要做下对账. 初学者对流式链路还是不放心, 除非后面对整条链路从源码到实践都非常熟悉. 难怪以前的流式处理lambda架构最先出现, 流一遍, 批一遍.

官方代码

  • flink walkthrough

https://github.com/apache/flink-playgrounds

  • flink training

https://github.com/apache/flink-training/tree/master/

文档-基于 Table API 实现实时报表

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/table_api/

文档-Flink 常用操作

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/flink-operations-playground/

常用命令操作

flink客户端命令行操作, 一般都有对应的api操作. 有的命令行可能需要几条api组合才能完成, 比如提交flink jar包任务并运行, flink命令行一般是flink run -d xxx.jar, 对于api而言就需要先执行jar包上传, 然后再执行启动命令.


docker-compose run --no-deps client flink list

Waiting for response...
------------------ Running/Restarting Jobs -------------------
10.09.2023 14:51:30 : d3b73713a4153d616d89006b19e08931 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.


docker-compose run --no-deps client curl jobmanager:8081/jobs

{"jobs":[{"id":"d3b73713a4153d616d89006b19e08931","status":"RUNNING"}]}
docker-compose kill taskmanager
[+] Killing 1/1
✔ Container operations-playground-taskmanager-1 Killed 0.3s

job manager里面, 运行中的任务被重新提交, 但是由于没有tm可用, 一直在重新提交运行阶段

picture 0

job manager日志里面, 自动提交任务失败

2023-09-10 15:08:38,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: ClickEvent Source (1/1) (b1467af81ce4d534df9a6d88be23704d_bc764cd8ddf7a0cff126f51c16239658_0_131) switched from CREATED to SCHEDULED.
2023-09-10 15:08:38,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ClickEvent Counter (1/1) (b1467af81ce4d534df9a6d88be23704d_0a448493b4782967b150582570326227_0_131) switched from CREATED to SCHEDULED.
2023-09-10 15:08:38,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ClickEventStatistics Sink: Writer (1/1) (b1467af81ce4d534df9a6d88be23704d_ea632d67b7d595e5b851708ae9ad79d6_0_131) switched from CREATED to SCHEDULED.
2023-09-10 15:08:38,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ClickEventStatistics Sink: Committer (1/1) (b1467af81ce4d534df9a6d88be23704d_6d2677a0ecc3fd8df0b72ec675edf8f4_0_131) switched from CREATED to SCHEDULED.
2023-09-10 15:08:38,868 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job d3b73713a4153d616d89006b19e08931: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2023-09-10 15:08:38,934 WARN org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Could not fulfill resource requirements of job d3b73713a4153d616d89006b19e08931. Free slots: 0
2023-09-10 15:08:38,935 WARN org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - Could not acquire the minimum required resources, failing slot requests. Acquired: []. Current slot pool status: Registered TMs: 0, registered slots: 0 free slots: 0
2023-09-10 15:08:38,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ClickEvent Source (1/1) (b1467af81ce4d534df9a6d88be23704d_bc764cd8ddf7a0cff126f51c16239658_0_131) switched from SCHEDULED to FAILED on [unassigned resource].
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
2023-09-10 15:08:38,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the results produced by task execution b1467af81ce4d534df9a6d88be23704d_bc764cd8ddf7a0cff126f51c16239658_0_131.
2023-09-10 15:08:38,936 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#131) of source Source: ClickEvent Source.
2023-09-10 15:08:38,936 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 4 tasks will be restarted to recover the failed task b1467af81ce4d534df9a6d88be23704d_bc764cd8ddf7a0cff126f51c16239658_0_131.

重新启动, 可以观察到任务恢复.

在task manager停止期间进入的kafka数据并没有丢失, 结果重新正确输出.

问题: 这时候进入的kafka数据是存储在内存中吗? 还是能够暂停kafka消费?

docker-compose up -d  taskmanager
[+] Running 2/2
✔ Container operations-playground-jobmanager-1 Running 0.0s
✔ Container operations-playground-taskmanager-1 Started

返回的id据说是trigger id, 可以用来继续查找checkpoint保存的详情

docker-compose run --no-deps client curl jobmanager:8081/jobs/d3b73713a4153d616d89006b19e08931/stop -d '{"drain": false}'  

{"request-id":"a5fbbb5adfc7a3b0f20cf58413baea64"}

测试job详情, 不过有点问题

docker-compose run --no-deps client curl jobmanager:8081/jobs/d3b73713a4153d616d89006b19e08931/savepoints/a5fb31bb5adfc7a3b0f20cf58413baea64 


curl localhost:8081/jobs
curl localhost:8081/jobs/d3b73713a4153d616d89006b19e08931/stop -d '{"drain": false}'
curl localhost:8081/jobs/d3b73713a4153d616d89006b19e08931/savepoints/a5fb31bb5adfc7a3b0f20cf58413baea64
curl localhost:8081/jobs/d3b73713a4153d616d89006b19e08931/savepoints/8aebc5d6be9fbb59ce4b31de40b0524e


backpressure

在flink map操作里, 在偶数分钟的时候添加一个100ms的sleep操作, 将速率降低到1秒10个, 任务显示起来就是busy状态.

picture 1

关于watermark与window

问了chatgpt才知道, watermark与window不是一回事. window用于提供一个时间窗口, 用于对流式数据进行汇总, 然后在这个汇总的时间段内进行计算.watermark则是用于对流式数据的有序性和时效性进行补救, 可以定义flink会等待多少时间才认为上一个时间批次已经完成, 在这个延时时间里可以对晚到达的数据进行补救和重新排序.

tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")");

picture 3

这段解释很有意思, 原来往前追溯的watermark是这么个意思: 当前时间戳的时间到达时, 当前时间戳的上一个interval的时间戳可以关门.

picture 7

picture 2

picture 4

SELECT 
TUMBLE_START(transaction_time, INTERVAL '10' MINUTE) AS wStart,
TUMBLE_END(transaction_time, INTERVAL '10' MINUTE) AS wEnd,
account_id,
SUM(amount) AS total_amount
FROM transactions
GROUP BY
TUMBLE(transaction_time, INTERVAL '10' MINUTE),
account_id


tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")");

tEnv.executeSql("SELECT " +
" TUMBLE_START(transaction_time, INTERVAL '10' MINUTE) AS wStart," +
" TUMBLE_END(transaction_time, INTERVAL '10' MINUTE) AS wEnd," +
" account_id," +
" SUM(amount) AS total_amount " +
"FROM transactions " +
"GROUP BY " +
" TUMBLE(transaction_time, INTERVAL '10' MINUTE), " +
" account_id");

后面仔细看才发现文档里确实提供了window的写法

public static Table report(Table transactions) {
return transactions
.window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
.groupBy($("account_id"), $("log_ts"))
.select(
$("account_id"),
$("log_ts").start().as("log_ts"),
$("amount").sum().as("amount"));
}

chatgpt4 咨询

WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());

picture 5

WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
TumblingEventTimeWindows.of(WINDOW_SIZE) :
TumblingProcessingTimeWindows.of(WINDOW_SIZE);


DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.window(assigner)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");

picture 6

flink run -d /opt/ClickCountJob.jar:这表示我们要运行的Flink作业的JAR文件是/opt/ClickCountJob.jar。-d参数表示我们要在后台运行这个作业。

throttle 限速与 backpressure 反压的写法

在官方demo代码里都有, 这些有意思的写法得到处看看才知道.

https://github.com/apache/flink-playgrounds

throttle

在1秒内分成20个batch, 生成这个时间段需要的数量后, 就进行sleep操作, 非常简单.

所以限速并没有所谓的匀速限速, 能做到的只是把时间切分成小分段, 在这个小分段里全力生产数据, 到了就缓一缓.

https://github.com/apache/flink-playgrounds/blob/master/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java


package org.apache.flink.playground.datagen;

/** A data throttler that controls the rate at which data is written out to Kafka. */
final class Throttler {

private final long throttleBatchSize;
private final long nanosPerBatch;

private long endOfNextBatchNanos;
private int currentBatch;

Throttler(long maxRecordsPerSecond) {
if (maxRecordsPerSecond == -1) {
// unlimited speed
throttleBatchSize = -1;
nanosPerBatch = 0;
endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
currentBatch = 0;
return;
}
final float ratePerSubtask = (float) maxRecordsPerSecond;

if (ratePerSubtask >= 10000) {
// high rates: all throttling in intervals of 2ms
throttleBatchSize = (int) ratePerSubtask / 500;
nanosPerBatch = 2_000_000L;
} else {
throttleBatchSize = ((int) (ratePerSubtask / 20)) + 1;
nanosPerBatch = ((int) (1_000_000_000L / ratePerSubtask)) * throttleBatchSize;
}
this.endOfNextBatchNanos = System.nanoTime() + nanosPerBatch;
this.currentBatch = 0;
}

void throttle() throws InterruptedException {
if (throttleBatchSize == -1) {
return;
}
if (++currentBatch != throttleBatchSize) {
return;
}
currentBatch = 0;

final long now = System.nanoTime();
final int millisRemaining = (int) ((endOfNextBatchNanos - now) / 1_000_000);

if (millisRemaining > 0) {
endOfNextBatchNanos += nanosPerBatch;
Thread.sleep(millisRemaining);
} else {
endOfNextBatchNanos = now + nanosPerBatch;
}
}
}

https://github.com/apache/flink-playgrounds/blob/master/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java


/** Generates CSV transaction records at a rate */
public class Producer implements Runnable, AutoCloseable {

private volatile boolean isRunning;

private final String brokers;

private final String topic;

public Producer(String brokers, String topic) {
this.brokers = brokers;
this.topic = topic;
this.isRunning = true;
}

@Override
public void run() {
KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());

Throttler throttler = new Throttler(100);

TransactionSupplier transactions = new TransactionSupplier();

while (isRunning) {

Transaction transaction = transactions.get();

long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();

ProducerRecord<Long, Transaction> record =
new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
producer.send(record);

try {
throttler.throttle();
} catch (InterruptedException e) {
isRunning = false;
}
}

producer.close();
}
@Override
public void close() {
isRunning = false;
}
}


backpressure

这个实现也很神奇, 在flink map操作里, 每次sleep100ms, 相当于1秒操作10次map. 看起来是利用了map操作是单线程序列化.

/**
* This MapFunction causes severe backpressure during even-numbered minutes.
* E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
* but from 10:13:00 to 10:13:59 events will pass through unimpeded.
*/
public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {

private boolean causeBackpressure() {
return ((LocalTime.now().getMinute() % 2) == 0);
}

@Override
public ClickEvent map(ClickEvent event) throws Exception {
if (causeBackpressure()) {
Thread.sleep(100);
}

return event;
}

}

backpressure的使用

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java


KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setTopics(inputTopic)
.setValueOnlyDeserializer(new ClickEventDeserializationSchema())
.setProperties(kafkaProps)
.build();

WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());

DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "ClickEvent Source");

if (inflictBackpressure) {
// Force a network shuffle so that the backpressure will affect the buffer pools
clicks = clicks
.keyBy(ClickEvent::getPage)
.map(new BackpressureMap())
.name("Backpressure");
}

WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
TumblingEventTimeWindows.of(WINDOW_SIZE) :
TumblingProcessingTimeWindows.of(WINDOW_SIZE);


DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.window(assigner)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");

statistics.sinkTo(
KafkaSink.<ClickEventStatistics>builder()
.setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build())
.name("ClickEventStatistics Sink");

env.execute("Click Event Count");

一些source与sink的demo

sql版本的 kafka source 与 mysql sink

https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java


public class SpendReport {

public static Table report(Table transactions) {
return transactions.select(
$("account_id"),
$("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
$("amount"))
.groupBy($("account_id"), $("log_ts"))
.select(
$("account_id"),
$("log_ts"),
$("amount").sum().as("amount"));
}


public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")");

tEnv.executeSql("CREATE TABLE spend_report (\n" +
" account_id BIGINT,\n" +
" log_ts TIMESTAMP(3),\n" +
" amount BIGINT\n," +
" PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
" 'table-name' = 'spend_report',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'sql-demo',\n" +
" 'password' = 'demo-sql'\n" +
")");

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");
}
}

java版本的kafka source与kafka target

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java


public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

configureEnvironment(params, env);

boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);

String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", "localhost:9092");
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");

KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setTopics(inputTopic)
.setValueOnlyDeserializer(new ClickEventDeserializationSchema())
.setProperties(kafkaProps)
.build();

WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());

DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "ClickEvent Source");

if (inflictBackpressure) {
// Force a network shuffle so that the backpressure will affect the buffer pools
clicks = clicks
.keyBy(ClickEvent::getPage)
.map(new BackpressureMap())
.name("Backpressure");
}

WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
TumblingEventTimeWindows.of(WINDOW_SIZE) :
TumblingProcessingTimeWindows.of(WINDOW_SIZE);

DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.window(assigner)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");

statistics.sinkTo(
KafkaSink.<ClickEventStatistics>builder()
.setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build())
.name("ClickEventStatistics Sink");

env.execute("Click Event Count");
}

python版本的kafka source与es sink

https://github.com/apache/flink-playgrounds/blob/master/pyflink-walkthrough/payment_msg_proccessing.py

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.expressions import call, col
from pyflink.table.udf import udf


provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", "Chongqing", "Xizang")


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def province_id_to_name(id):
return provinces[id]


def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

create_kafka_source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT
) WITH (
'connector' = 'kafka',
'topic' = 'payment_msg',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""

create_es_sink_ddl = """
CREATE TABLE es_sink(
province VARCHAR PRIMARY KEY,
pay_amount DOUBLE
) with (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'platform_pay_amount_1',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '42mb',
'sink.bulk-flush.max-actions' = '32',
'sink.bulk-flush.interval' = '1000',
'sink.bulk-flush.backoff.delay' = '1000',
'format' = 'json'
)
"""

t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
t_env.register_function('province_id_to_name', province_id_to_name)

t_env.from_path("payment_msg") \
.select(call('province_id_to_name', col('provinceId')).alias("province"), col('payAmount')) \
.group_by(col('province')) \
.select(col('province'), call('sum', col('payAmount').alias("pay_amount"))) \
.execute_insert("es_sink")



created at 2023-09-11