).明白了整理笔记如下:
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-sink-order.md
Sink 接收数据的顺序(Window发送数据顺序)
概述
-
InternalTimerServiceImpl.processingTimeTimersQueue存储着同一个Window中所有Key,取第一个key,调用WindowOperator.onProcessingTime进行处理,并发送给Sink
-
InternalTimerServiceImpl.processingTimeTimersQueue
key处理的顺序是,先处理第一个,然后依次把最后一个元素放到第一个元素进行处理
-
Key,处理的顺序,如 1 2 1 3 2 4 5,就会变成
1
5
4
3
2
输入数据
1 2 1 3 2 4 5
源码分析
RecordWriter.emit
-
当WordCount中的数据经过Operator(Source,FlatMap,Map) 处理后,通过RecordWriter.emit()函数发射数据
-
此时发这样的数据格式发送
WordWithCount(1,1)
WordWithCount(2,1)
WordWithCount(1,1)
WordWithCount(3,1)
WordWithCount(2,1)
WordWithCount(4,1)
WordWithCount(5,1)
-
WindowOperator.processElement会接收并处理
private void emit(T record, int[] targetChannels) throws IOException,
InterruptedException {
serializer.serializeRecord(record);
boolean pruneAfterCopying = false;
for (int channel : targetChannels) {
if (copyFromSerializerToTargetChannel(channel)) {
pruneAfterCopying = true;
}
}
// Make sure we don't hold onto the large intermediate
serialization buffer for too long
if (pruneAfterCopying) {
serializer.prune();
}
}
WindowOperator.processElement(StreamRecord element)
-
WindowOperator.processElement,给每一个WordWithCount(1,1)
这样的元素分配window,也就是确认每一个元素属于哪一个窗口,因为需要对同一个窗口的相同key进行聚合操作
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
-
把当前元素增加到state中保存,add函数中会对相同key进行聚合操作(reduce),对同一个window中相同key进行求和就是在这个方法中进行的
windowState.add(element.getValue());
-
triggerContext.onElement(element),对当前元素设置trigger,也就是当前元素的window在哪个时间点触发(结束的时间点),
把当前元素的key,增加到InternalTimerServiceImpl.processingTimeTimersQueue中,每一条数据会加一次,加完后会去重,相当于Set,对相同Key的处理,
后面发送给Sink的数据,就是遍历这个processingTimeTimersQueue中的数据,当然,每次发送第一个元素,发送后,会把最后一个元素放到第一个元素
TriggerResult triggerResult = triggerContext.onElement(element);
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(),
windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows =
getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a
merge, in that case the actualWindow
// is the merged window and we work with that.
If we don't merge then
// actualWindow == window
W actualWindow =
mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W>
mergedWindows, W stateWindowResult,
Collection<W>
mergedStateWindows) throws Exception {
if
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness
<= internalTimerService.currentWatermark())) {
throw new
UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by
merging. Current watermark: " + internalTimerService.currentWatermark() +
"
window: " + mergeResult);
} else if
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <=
internalTimerService.currentProcessingTime()) {
throw new
UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time
" +
"by
merging. Current processing time: " +
internalTimerService.currentProcessingTime() +
"
window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window =
mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
triggerContext.window =
m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state
windows into the newly resulting state window
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow =
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window
" + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult =
triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow,
contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in
state
mergingWindows.persist();
} else {
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult =
triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed
lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
InternalTimerServiceImpl.onProcessingTime
-
processingTimeTimersQueue(HeapPriorityQueueSet) 该对象中存储了所有的key,这些key是去重后,按处理顺序排序
-
processingTimeTimersQueue.peek() 取出第一条数据进行处理
-
processingTimeTimersQueue.poll();会移除第一条数据,并且,拿最后一条数据,放第1一个元素,导致,所有元素的处理顺序是,先处理第一个元素,然后,把最后一个元素放第一个,
最后一个就置为空,再循环处理所有数据,相当于处理完第一个元素,处后从最后一个元素开始处理,一直处理到完成,举例
1 2 1 3 2 4 5
存为 1 2 3 4 5
顺序就变为
1
5
4
3
2
-
keyContext.setCurrentKey(timer.getKey());//设置当前的key,当前需要处理的
-
triggerTarget.onProcessingTime(timer);// 调用
WindowOperator.onProcessingTime(timer)处理
queue = {HeapPriorityQueueElement[129]@8184}
1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1),
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2),
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3),
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
- 调用 WindowOperator.onProcessingTime(timer)处理当前key;
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls
registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null &&
timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer =
processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
WindowOperator.onProcessingTime
- triggerResult.isFire()// 当前元素对应的window已经可以发射了,即过了结束时间
- windowState.get() //取出当前key对应的(key,value)此时已经是相同key聚合后的值
- emitWindowContents(triggerContext.window, contents);//发送给Sink进行处理
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow =
mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this
can only happen if a
// trigger did not clean up timers. We have
already cleared the merging
// window and therefore the Trigger state,
however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
TriggerResult triggerResult =
triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window,
contents);
}
}
if (triggerResult.isPurge()) {
windowState.clear();
}
if (!windowAssigner.isEventTime() &&
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState,
mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in
state
mergingWindows.persist();
}
}
On Saturday, 2 March 2019, 12:31:22 pm GMT+8, [email protected]
<[email protected]> wrote:
【问题】).想知道local,flink
在window完成时,发送给sink的数据顺序,这个顺序是怎么确定的?-------------------).输入数据:1 2 1 3
2).程序:Flink 1.7.2 local wordCount, dataStream.timeWindow(Time.seconds(10))
).WindowOperator.onProcessingTime windowState.stateTable.primaryTable 数据结构
167 = {CopyOnWriteStateTable$StateTableEntry@7257}
"((1)|TimeWindow{start=1551500340000, end=1551500350000})=WordWithCount(1,2)"
419 = {CopyOnWriteStateTable$StateTableEntry@7258}
"((3)|TimeWindow{start=1551500340000, end=1551500350000})=WordWithCount(3,1)"
767 = {CopyOnWriteStateTable$StateTableEntry@7259}
"((2)|TimeWindow{start=1551500340000, end=1551500350000})=WordWithCount(2,2)"
). 发送给sink时 发送的顺序是: WordWithCount(1,2) WordWithCount(3,1)
WordWithCount(2,2) ??问题是,这个顺序是怎么确定的? ).keyContext.getCurrentKey() 的顺序是
1 3 2 keyContext.getCurrentKey() 这个key是怎遍历顺序的?
-------------------