20200622 说说 flink Watermark 原理


啥叫个水位线 Watermark

  • Watermark 是一种告诉 Flink 一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。
  • 可以把 Watermarks 理解为一个水位线,这个 Watermarks 在不断的变化。Watermark 实际上作为数据流的一部分随数据流流动。
  • 当 Flink 中的运算符接收到 Watermarks 时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。
  • 这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

具体的应用中

在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做 窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全 部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。

那么 Flink 是怎么计算 Watermak 的值呢?

Watermark = 进入Flink的最大的事件时间(mxtEventTime)指定的延迟时间(t)

那么有 Watermark 的 Window 是怎么触发窗口函数的呢?

如果有窗口的停止时间等于或者小于maxEventTime – t(当时的 warkmark),那么 这个窗口被触发执行。 注意:Watermark 本质可以理解成一个延迟触发机制。

一个具体的 🌰1

假如我们设置 10s 的时间窗口(window),那么0~10s10~20s都是一个窗口,以0~10s为例,0 为 start-time,10 为 end-time。假如有 4 个数据的 event-time 分别是8(A),12.5(B),9(C),13.5(D),首先我们看到数据是乱序的,数据 C 是延迟到达的,我们设置 Watermarks 为当前所有到达数据 event-time 的最大值减去延迟值 3.5 秒,

当 A 到达的时候,Watermarks 为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
当 B 到达的时候,Watermarks 为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当 C 到达的时候,Watermarks 为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当 D 到达的时候,Watermarks 为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
触发计算的时候,会将 A,C(因为他们都小于 10)都计算进去,其中 C 是迟到的。
max 这个很关键,就是当前窗口内,所有事件的最大事件。
这里的延迟 3.5s 是我们假设一个数据到达的时候,比他早 3.5s 的数据肯定也都到达了,这个是需要根据经验推算。假设加入 D 到达以后有到达了一个 E,event-time=6,但是由于 0~10 的时间窗口已经开始计算了,所以 E 就丢了。
从这里上面 E 的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

一个具体的 🌰2

需求:每隔 5 秒中统计一下最近 10 秒内每个基站中通话时间最长的一次通话发生的呼叫时间、主叫号码,被叫号码,通话时长。并且还得告诉我到底是哪个时间范围(10 秒) 内的。注意:基站日志数据传入的时候是无序的,通过观察发现时间最多延迟了 3 秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 每隔5秒中统计一下最近10秒内每个基站中通话时间最长的一次通话发生的
* 呼叫时间、主叫号码,被叫号码,通话时长。
* 并且还得告诉我到底是哪个时间范围(10秒)内的。 */


object MaxLongCallTime {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流计算)上下文执行环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
//读取文件数据
val data = streamEnv.socketTextStream("hadoop101", 8888).map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.to Long)
})
.assignTimestampsAndWatermarks(
//引入Watermark
new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) {
//延迟3 秒
override def extractTimestamp(element: StationLog) = {
element.callTime
}
})
//分组,开窗处理
data.keyBy(_.sid).timeWindow(Time.seconds(10), Time.seconds(5))
//reduce 函数做增量聚合 ,MaxTimeAggregate能做到来一条数据处理一条,
// ReturnMaxTime 在窗口触发的时候调用
.reduce(new MaxTimeReduce, new ReturnMaxTime).print()
streamEnv.execute()
}


class MaxTimeReduce extends ReduceFunction[StationLog] {
override def reduce(t: StationLog, t1: StationLog): StationLog = {
//通话时间比较
if (t.duration > t1.duration) t else t1
}
}


class ReturnMaxTime extends WindowFunction[StationLog, String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[StationLog], out: Collector[String]): Unit = {
var sb = new StringBuilder sb
.append("窗口范围是: ").append(window.getStart).append("----").append(window.getEnd) sb
.append("\n") sb
.append("通话日志:").append(input.iterator.next()) out
.collect(sb.toString())
}
}

}

参考

https://www.cnblogs.com/rossiXYZ/p/12286407.html


文章作者: Callable
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Callable !
评论
  目录