flink笔记

Flink架构

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如Hadoop YARNApache MesosKubernetes,但也可以设置作为独立集群甚至库运行。

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager

The processes involved in executing a Flink dataflow

JobManager

obManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager

    ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher

    Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster

    JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

TaskManagers

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

Flink中的API

Flink为流式/批式处理应用程序的开发提供了不同级别的抽象

Programming levels of abstraction

有状态流处理

如果要跨越多个事件(比如多个windows操作)记录信息,这种操作就叫做有状态。

Keyed State

及时流处理

及时流处理是有状态流处理的扩展。 可以用来处理时间序列分析

时间的概念:事件时间和处理时间

  • 事件时间:

    事件时间是每个独立的事件在它的生产设备上产生的时间。这个时间一般是时间在进去Flink之前嵌入的。

  • 处理时间

    处理时间指的是机器进行相应的操作时的系统时间。

    Event Time and Processing Time

事件时间和水印

Flink中计算程序的event time的原理是watermarks。watermarks作为数据流的一部分,并且带着一个timestamp。Watermark(t)表示数据流中的event time已经到达时间t了,意味着数据流中不应该有timestamp t' <= t的元素。

A data stream with events (in order) and watermarks

A data stream with events (out of order) and watermarks

并行流中的watermarks

Parallel data streams and operators with events and watermarks

Watermarks策略简介

为了使用事件时间语义,Flink应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。通常通过使用TimestampAssignerAPI从元素中的某个字段去访问/提取时间戳。

时间戳的分配和watermark的生成是齐头并进的,可以告诉Flink应用程序事件时间的进度。可以通过指定WatermarkGenerator来配置watermark的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssignerWatermarkGeneratorWatermarkStrategyWatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGetneratorSupplier<T>{

    /**
     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根据策略实例化一个 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常情况下,不用实现此接口,而是使用WatermarkStrategy工具类中通用的watermark策略,或者可以使用这个工具类将自定义的TimestampAssignerWatermarkGenerator 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);


使用Watermark策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。

处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

自定义WatermarkGenerator

WatermarkGenerator 接口代码如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的调用,也许会生成新的 watermark,也许不会。
     *
     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本质上是有两种:周期性生成标记生成

周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。

标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。

窗口

窗口的生命周期

简而言之,只要第一个属于这个窗口的元素到达,窗口就建立了。当时间(事件时间或处理时间)加上用户指定的允许的延迟,超过了它的结束时间戳时,这个窗口就会完全删除。Flink保证只会删除那些基于时间的窗口,其他类型的窗口,比如说全局窗口,不会删除。

Keyed vs Non-Keyed Windows

第一件需要确定的事情,是你的stream是否需要keyed。这个需要在确定window之前就做好。使用keyBy(...)会将你的无限的stream分成有逻辑的keyed streams。如果keyBy(...)没有被调用,你的stream就不会被keyed

对于keyed stream,输入事件的任意属性都可以被用做key。使用一个keyed stream可以让你的窗口计算并行运行在多个任务上,因 为每个逻辑的keyed stream可以独立于其他的stream运行。拥有同一个key的所有元素都会被发送到同一个并行任务。

对于non-keyed streams,你的原始的stream不会被拆分成多个逻辑streams,而且所有的窗口逻辑都会在同一个单个任务中执行,即parallelism是1。

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

数据管道 & ETL

无状态的转换

map()

Apache FLink的一种常见应用场景是ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。

官方示例代码:

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .filter(new RideCleansingSolution.NYCFilter())
    .map(new Enrichment());

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

flatmap()

MapFunction只适用于一对一的转换:对每个进入算子的流元素,map()将仅输出一个转换后的元素。对于除此之外的场景,需要使用flatmap()

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .flatMap(new NYCEnrichment());

enrichedNYCRides.print();

其中用到的FlatMapFunction:

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

keyBy将一个流根据其中的一些属性来进行分区,从而使所有具有相同属性的事件分到相同的组里。

rides
    .flatMap(new NYCEnrichment())
    .keyBy(enrichedRide -> enrichedRide.startCell)

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

KeySelector 不仅限于从事件中抽取键。也可以按想要的方式计算得到键值:

keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream的聚合

reduce()和其他聚合算子

有状态的转换

在flink不参与管理状态的情况下,你的应用也可以使用状态,但Flink为其管理状态提供了一些引人注目的特性:

  • 本地性
  • 持久性
  • 纵向可扩展性
  • 横向可扩展性
  • 可查询性

Rich Functions

Flink的几种函数接口,包括FilterFunction、MapFunction、FlatMapFunction,都是单一抽象方法模式。Flink同样也为他们提供了一个所谓rich的变体,如RichFlatMapFunction,其中增加了以下放大,包括:

  • open(Configuration)
  • close()
  • getRuntimeContext()

open()仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext()是创建和访问Flink状态的途径

一个使用Keyed State的例子

在这个例子里,想象有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为DeduplicatorRichFlatMapFunction

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

Deduplicator需要记录每个键是否已经有了相应的记录。它将通过使用Flink的keyed state接口来做这件事,Flink会为每个状态中管理的条目维护一个键值存储。Flink支持几种不同方式的keyed state,这个例子使用的是最简单的一个,叫做ValaueState。意思是对于每个键,FLink将存储一个单一的对象。

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

在键无限增长的应用中,清除再也不会使用的状态是很必要的:

keyHasBeenSeen.clear()

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Connected Streams

当需要更灵活地调整转换的某些功能,可以使用connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

flatMap1flatMap2 的调用顺序是没法控制的,这两个输入流是相互竞争的关系。

事件驱动应用

容错处理

Last Updated: 5/24/2021, 5:57:04 PM