处理数据
旁路输出
只有在特定的函数中才能使用旁路输出,具体如下
1)ProcessFunction。
2)KeyedProcessFunction。
3)CoProcessFunction。
4)ProcessWindowFunction。
5)ProcessAllWindowFunction。
6)ProcessJoinFunction。
7)KeyedCoProcessFunction。
.....
public class RuntimeEnvironment implements Environment {
private final JobID jobId;
private final JobVertexID jobVertexId;
private final ExecutionAttemptID executionId;
private final TaskInfo taskInfo;
private final Configuration jobConfiguration;
private final Configuration taskConfiguration;
private final ExecutionConfig executionConfig;
private final UserCodeClassLoader userCodeClassLoader;
private final MemoryManager memManager;
private final IOManager ioManager;
private final BroadcastVariableManager bcVarManager;
private final TaskStateManager taskStateManager;
private final GlobalAggregateManager aggregateManager;
private final InputSplitProvider splitProvider;
private final ExternalResourceInfoProvider externalResourceInfoProvider;
private final Map<String, Future<Path>> distCacheEntries;
private final ResultPartitionWriter[] writers;
private final IndexedInputGate[] inputGates;
private final TaskEventDispatcher taskEventDispatcher;
private final CheckpointResponder checkpointResponder;
private final TaskOperatorEventGateway operatorEventGateway;
private final AccumulatorRegistry accumulatorRegistry;
private final TaskKvStateRegistry kvStateRegistry;
private final TaskManagerRuntimeInfo taskManagerInfo;
private final TaskMetricGroup metrics;
private final Task containingTask;
RuntimeContext是Function运行时的上下文,封装了Function运行时可能需要的所有信息,让Function在运行时能够获取到作业级别的信息,如并行度相关信息、Task名称、执行配置信息(ExecutionConfig)、State等
物理Transformation
物理Transformation一共有4种,具体如下 SourceTransformation,SinkTransformation,OneInputTransformation,TwoInputTransformation
所有的算子都包含了生命周期管理、状态与容错管理、数据处理3个方面的关键行为。
生命周期管理
1)setup:初始化环境、时间服务、注册监控等。
2)open:该行为由各个具体的算子负责实现,包含了算子的初始化逻辑,如状态初始化等。算子执行该方法之后,才会执行Function进行数据的处理。
3)close:所有的数据处理完毕之后关闭算子,此时需要确保将所有的缓存数据向下游发送。
4)dispose:该方法在算子生命周期的最后阶段执行,此时算子已经关闭,停止处理数据,进行资源的释放。
状态与容错管理
算子负责状态管理,提供状态存储,触发检查点的时候,保存状态快照,并且将快照异步保存到外部的分布式存储。当作业失败的时候算子负责从保存的快照中恢复状态
数据处理
算子对数据的处理,不仅会进行数据记录的处理,同时也会提供对Watermark和LatencyMarker的处理。算子按照单流输入和双流输入,定义了不同的行为接口
函数层次
UDF在DataStream API层使用,Flink提供的函数体系从接口的层级来看,从高阶Function到低阶Function
RichFunction相比无状态Function,有两方面的增强:
1)增加了open和close方法来管理Function的生命周期,在作业启动时,Function在open方法中执行初始化,在Function停止时,在close方法中执行清理,释放占用的资源等。无状态Function不具备此能力。
2)增加了getRuntimeContext和setRuntimeContext。通过RuntimeContext,RichFunction能够获取到执行时作业级别的参数信息,而无状态Function不具备此能力。
无状态Function天然是容错的,作业失败之后,重新执行即可,但是有状态的Function(RichFunction)需要处理中间结果的保存和恢复,待有了状态的访问能力,也就意味着Function是可以容错的,执行过程中,状态会进行快照然后备份,在作业失败,Function能够从快照中恢复回来。
处理函数
ProcessFunction:单流输入函数。CoProcessFunction:双流输入函数。KeyedProcessFunction:单流输入函数。KeyedCoProcessFunction:双流输入函数。
广播函数
异步函数
数据源函数
输出函数
检查点函数
连接器在Flink中叫作Connector。Flink本身是计算引擎,并不提供数据存储能力,所以需要访问外部数据,外部数据源类型繁多,连接器因此应运而生,它提供了从数据源读取数据和写入数据的能力。基于SourceFunction和SinkFunction构建出了种类繁多的连接器
在分布式计算中,Flink对所有需要进行唯一标识的组件、对象提供了抽象类AbstractID,因为需要跨网络进行传递,所以该类实现了Serializable接口,需要比较唯一标识是否相同,所以也实现了Comparable接口
如果想了解流处理系统中如何实现强一致性,可以参考MillWheel:Fault-Tolerant Stream Processing at Internet Scale和Discretized Streams:Fault-Tolerant Streaming Computation at Scale两篇论文
WindowAssigner
WindowTrigger
Trigger触发器决定了一个窗口何时能够被计算或清除,每一个窗口都拥有一个属于自己的Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入该窗口,或者之前注册的定时器超时时,Trigger都会被调用。Trigger触发的结果如下。
1)Continue:继续,不做任何操作。
2)Fire:触发计算,处理窗口数据。
3)Purge:触发清理,移除窗口和窗口中的数据。
4)Fire + Purge:触发计算+清理,处理数据并移除窗口和窗口中的数据。
WindowEvictor
1)CountEvictor: 计数过滤器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
2)DeltaEvictor: 阈值过滤器。本质上来说就是一个自定义规则,计算窗口中每个数据记录,然后与一个事先定义好的阈值做比较,丢弃超过阈值的数据记录。
3)TimeEvictor: 时间过滤器。保留Window中最近一段时间内的元素,并丢弃其余元素。
Window函数
1)增量计算函数
2)全量计算函数
1) AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-1,此处减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃。
2)BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-固定延迟。
(2)每事件Watermark策略每事件Watermark策略在Flink中叫作PuntuatedWatamarkAssigner,数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上会对下游算子造成压力
(3)无为策略无为策略在Flink中叫作PreserveWatermark。在Flink中可以使用DataStream API和Table & SQL混合编程,所以Flink SQL中不设定Watermark策略,使用底层DataStream中的Watermark策略
Flink内部自主进行内存管理,将数据以二进制结构保存在内存中,目前的实现中大量使用了堆外内存。如果让开发人员直接操作二进制结构,代码会变得复杂臃肿,所以大数据平台在设计API的时候,允许用户直接像编写普通Java应用程序一样使用其API开发Function,直接使用JDK提供的类型和自定义类型。
内存布局
TaskManager是Flink中执行计算的核心组件,是用来运行用户代码的Java进程。其中大量使用了堆外内存。
(1)ValueState<T>即类型为T的单值状态。这个状态与对应的Key绑定,是最简单的状态。可以通过update方法更新状态值,通过value()方法获取状态值。
(2)ListState<T>即Key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
(3)ReducingState<T>这种状态通过用户传入的reduceFunction,每次调用add方法添加值时,会调用reduceFunction,最后合并到一个单一的状态值。
(4)AggregatingState<IN,OUT>聚合State,和(3)不同的是,这里聚合的类型可以是不同的元素类型,使用add(IN)来加入元素,并使用AggregateFunction函数计算聚合结果。
(5)MapState<UK,UV>使用Map存储Key-Value对,通过put(UK,UV)或者putAll(Map<UK,UV>)来添加,使用get(UK)来获取。
按照由Flink管理还是用户自行管理,状态可以分为原始状态(Raw State)和托管状态(Managed State)。原始状态,即用户自定义的State,Flink在做快照的时候,把整个State当作一个整体,需要开发者自己管理,使用byte数组来读写状态内容。托管状态是由Flink框架管理的State,如ValueState、ListState、MapState等,其序列化与反序列化由Flink框架提供支持,无须用户感知、干预。KeyedState和OperatorState可以是原始状态,也可以是托管状态。通常在DataStream上的状态推荐使用托管状态,一般情况下,在实现自定义算子时,才会使用到原始状态。
在图可以看到,业务数据流是一个普通数据流,规则数据流是广播数据流,这样就可以满足实时性、规则更新的要求。规则算子将规则缓存在本地内存中,在业务数据流记录到来时,能够使用规则处理数据。
资源管理器在Flink中叫作ResourceManager。Flink同时支持不同的资源集群类型,ResourceManager位于Flink和资源管理集群(Yarn、K8s等)之间,是Flink集群级资源管理的抽象,其主要作用如下
1)申请容器启动新的TM,或者为作业申请Slot。
2)处理JobManager和TaskManager的异常退出。
3)缓存TaskManager(即容器),等待一段时间之后再释放掉不用的容器,避免资源反复地申请释放。
4)JobManager和TaskManager的心跳感知,对JobManager和TaskManger的退出进行对应的处理
Slot管理器在Flink中叫作SlotManager,是ResourceManager的组件,从全局角度维护当前有多少TaskManager、每个TaskManager有多少空闲的Slot和Slot等资源的使用情况。当Flink作业调度执行时,根据Slot分配策略为Task分配执行的位置
1)对TaskManager提供注册、取消注册、空闲退出等管理动作,注册则集群可用的Slot变多,取消注册、空闲推出则释放资源,还给资源管理集群。
2)对Flink作业,接收Slot的请求和释放、资源汇报等。当资源不足的时候,SlotManger将资源请求暂存在等待队列中,SlotManager通知ResourceManager去申请更多的资源,启动新的TaskManager,TaskManager注册到SlotManager之后,SlotManager就有可用的新资源了,从等待队列中依次分配资源。
SlotProvider接口定义了Slot的请求行为,支持两种请求模式。
1)立即响应模式:Slot请求会立即执行。
2)排队模式:排队等待可用Slot,当资源可用时分配资源
Slot资源池在Flink中叫作SlotPool,是JobMaster中记录当前作业从TaskManager获取的Slot的集合。
预提交阶段
这种方式只能在数据源Kafka到Flink内部保证严格一次,一旦涉及从Sink写入到外部Kafka就会出现问题了。假设Checkpoint 3完成之后,Source从Topic偏移量位置65536读取了1000条数据,Topic偏移量为66536,Sink写入了1000条数据到外部Kafka,此时Flink应用的1个Sink并行实例因为未处理的异常崩溃,进入Failover阶段,应用自动从Checkpoint 3恢复,重新从Topic的偏移量65536开始读取数据,这就会导致65536~66536之间的1000条数据被重复处理,写入到了Kafka中。这种情况下需要避免重复写入这1000条数据到Kafka中。幂等性是一种解决方案,如对HBase按照主键插入可能有效,第2次插入是对第1次的更新。
提交阶段
在预提交阶段,数据实际上已经写入外部存储,但是因为事务的原因是不可读的,所以Sink在事务提交阶段的工作稍微简单一点,当所有的Sink实例提交成功之后,一旦预提交完成,必须确保提交外部事务也要成功,此时算子和外部系统协同来保证。倘若提交外部事务失败(如网络故障等),Flink应用就会崩溃,然后根据用户重启策略进行回滚,回滚到预提交时的状态,之后再次重试提交。
会话窗口不同于事件窗口,它的切分依赖于事件的行为,而不是时间序列,所以在很多情况下会因为事件乱序使得原本相互独立的窗口因为新事件的到来导致窗口重叠,而必须要进行窗口的合并???(过程)