flink笔记
Flink架构
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如Hadoop YARN、Apache Mesos和Kubernetes,但也可以设置作为独立集群甚至库运行。
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。
JobManager
obManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
JobMaster
JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
TaskManagers
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子
Flink中的API
Flink为流式/批式处理应用程序的开发提供了不同级别的抽象
有状态流处理
如果要跨越多个事件(比如多个windows操作)记录信息,这种操作就叫做有状态。
Keyed State
及时流处理
及时流处理是有状态流处理的扩展。 可以用来处理时间序列分析
时间的概念:事件时间和处理时间
事件时间:
事件时间是每个独立的事件在它的生产设备上产生的时间。这个时间一般是时间在进去Flink之前嵌入的。
处理时间
处理时间指的是机器进行相应的操作时的系统时间。
事件时间和水印
Flink中计算程序的event time的原理是watermarks。watermarks作为数据流的一部分,并且带着一个timestamp。Watermark(t)表示数据流中的event time已经到达时间t了,意味着数据流中不应该有timestamp t' <= t的元素。
并行流中的watermarks
Watermarks策略简介
为了使用事件时间语义,Flink应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。通常通过使用TimestampAssignerAPI从元素中的某个字段去访问/提取时间戳。
时间戳的分配和watermark的生成是齐头并进的,可以告诉Flink应用程序事件时间的进度。可以通过指定WatermarkGenerator来配置watermark的生成方式。
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGetneratorSupplier<T>{
/**
* 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根据策略实例化一个 watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
通常情况下,不用实现此接口,而是使用WatermarkStrategy工具类中通用的watermark策略,或者可以使用这个工具类将自定义的TimestampAssigner 与 WatermarkGenerator 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
使用Watermark策略
WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。
第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。
处理空闲数据源
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
自定义WatermarkGenerator
WatermarkGenerator 接口代码如下:
/**
* {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
*
* <p><b>注意:</b> WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks}
* 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
*/
@Public
public interface WatermarkGenerator<T> {
/**
* 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* 周期性的调用,也许会生成新的 watermark,也许不会。
*
* <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
*/
void onPeriodicEmit(WatermarkOutput output);
}
watermark 的生成方式本质上是有两种:周期性生成和标记生成。
周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。
标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。
窗口
窗口的生命周期
简而言之,只要第一个属于这个窗口的元素到达,窗口就建立了。当时间(事件时间或处理时间)加上用户指定的允许的延迟,超过了它的结束时间戳时,这个窗口就会完全删除。Flink保证只会删除那些基于时间的窗口,其他类型的窗口,比如说全局窗口,不会删除。
Keyed vs Non-Keyed Windows
第一件需要确定的事情,是你的stream是否需要keyed。这个需要在确定window之前就做好。使用keyBy(...)会将你的无限的stream分成有逻辑的keyed streams。如果keyBy(...)没有被调用,你的stream就不会被keyed。
对于keyed stream,输入事件的任意属性都可以被用做key。使用一个keyed stream可以让你的窗口计算并行运行在多个任务上,因 为每个逻辑的keyed stream可以独立于其他的stream运行。拥有同一个key的所有元素都会被发送到同一个并行任务。
对于non-keyed streams,你的原始的stream不会被拆分成多个逻辑streams,而且所有的窗口逻辑都会在同一个单个任务中执行,即parallelism是1。
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
数据管道 & ETL
无状态的转换
map()
Apache FLink的一种常见应用场景是ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。
官方示例代码:
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}
flatmap()
MapFunction只适用于一对一的转换:对每个进入算子的流元素,map()将仅输出一个转换后的元素。对于除此之外的场景,需要使用flatmap()
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());
enrichedNYCRides.print();
其中用到的FlatMapFunction:
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
使用接口中提供的 Collector ,flatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。
Keyed Streams
keyBy()
keyBy将一个流根据其中的一些属性来进行分区,从而使所有具有相同属性的事件分到相同的组里。
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)
每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

KeySelector 不仅限于从事件中抽取键。也可以按想要的方式计算得到键值:
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
Keyed Stream的聚合
reduce()和其他聚合算子
有状态的转换
在flink不参与管理状态的情况下,你的应用也可以使用状态,但Flink为其管理状态提供了一些引人注目的特性:
- 本地性
- 持久性
- 纵向可扩展性
- 横向可扩展性
- 可查询性
Rich Functions
Flink的几种函数接口,包括FilterFunction、MapFunction、FlatMapFunction,都是单一抽象方法模式。Flink同样也为他们提供了一个所谓rich的变体,如RichFlatMapFunction,其中增加了以下放大,包括:
open(Configuration)close()getRuntimeContext()
open()仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
getRuntimeContext()是创建和访问Flink状态的途径
一个使用Keyed State的例子
在这个例子里,想象有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为Deduplicator的RichFlatMapFunction:
private static class Event {
public final String key;
public final long timestamp;
...
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();
env.execute();
}
Deduplicator需要记录每个键是否已经有了相应的记录。它将通过使用Flink的keyed state接口来做这件事,Flink会为每个状态中管理的条目维护一个键值存储。Flink支持几种不同方式的keyed state,这个例子使用的是最简单的一个,叫做ValaueState。意思是对于每个键,FLink将存储一个单一的对象。
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如
ValueState<Boolean> keyHasBeenSeen;
要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。
清理状态
在键无限增长的应用中,清除再也不会使用的状态是很必要的:
keyHasBeenSeen.clear()
也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。
Connected Streams
当需要更灵活地调整转换的某些功能,可以使用connected streams ,一个单独的算子有两个输入流。
connected stream 也可以被用来实现流的关联。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。
在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
flatMap1 和 flatMap2 的调用顺序是没法控制的,这两个输入流是相互竞争的关系。