[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#discussion_r287840233 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Emitter that handles event time synchronization between producer threads. + * + * Records are organized into per producer queues that will block when capacity is exhausted. + * + * Records are emitted by selecting the oldest available element of all producer queues, + * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval. + * + * @param + */ +public abstract class RecordEmitter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class); + + /** +* The default capacity of a single queue. +* +* Larger queue size can lead to higher throughput, but also to +* very high heap space consumption, depending on the size of elements. +* +* Note that this is difficult to tune, because it does not take into account +* the size of individual objects. +*/ + public static final int DEFAULT_QUEUE_CAPACITY = 100; + + private final int queueCapacity; + private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Boolean> emptyQueues = new ConcurrentHashMap<>(); + private final PriorityQueue> heads = new PriorityQueue<>(this::compareHeadElement); + private volatile boolean running = true; + private volatile long maxEmitTimestamp = Long.MAX_VALUE; + private long maxLookaheadMillis = 60 * 1000; // one minute + private long idleSleepMillis = 100; + private final Object condition = new Object(); + + public RecordEmitter(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) { + return Long.compare(left.headTimestamp, right.headTimestamp); + } + + /** +* Accepts records from readers. +* +* @param +*/ + public interface RecordQueue { + void put(T record) throws InterruptedException; + + int getQueueId(); + + int getSize(); + + T peek(); + } + + private class AsyncRecordQueue implements RecordQueue { + private final ArrayBlockingQueue queue; + private final int queueId; + long headTimestamp; + + private AsyncRecordQueue(int queueId) { + super(); + this.queue = new ArrayBlockingQueue<>(queueCapacity); + this.queueId = queueId; + this.headTimestamp = Long.MAX_VALUE; + } + + public void put(T record) throws InterruptedException { + queue.put(record); + // TODO: not pretty having this here + synchronized (condition) { + condition.notify(); + } + } + + public int getQueueId() { + return queueId; + } + + public int getSize() { + return queue.size(); + } + + public T peek() { + return queue.peek(); + } + + } + + /** +* The queue for the given producer (i.e. Kinesis shard consumer thread). +* +
[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#discussion_r287840412 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Emitter that handles event time synchronization between producer threads. + * + * Records are organized into per producer queues that will block when capacity is exhausted. + * + * Records are emitted by selecting the oldest available element of all producer queues, + * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval. + * + * @param + */ +public abstract class RecordEmitter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class); + + /** +* The default capacity of a single queue. +* +* Larger queue size can lead to higher throughput, but also to +* very high heap space consumption, depending on the size of elements. +* +* Note that this is difficult to tune, because it does not take into account +* the size of individual objects. +*/ + public static final int DEFAULT_QUEUE_CAPACITY = 100; + + private final int queueCapacity; + private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Boolean> emptyQueues = new ConcurrentHashMap<>(); + private final PriorityQueue> heads = new PriorityQueue<>(this::compareHeadElement); + private volatile boolean running = true; + private volatile long maxEmitTimestamp = Long.MAX_VALUE; + private long maxLookaheadMillis = 60 * 1000; // one minute + private long idleSleepMillis = 100; + private final Object condition = new Object(); + + public RecordEmitter(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) { + return Long.compare(left.headTimestamp, right.headTimestamp); + } + + /** +* Accepts records from readers. +* +* @param +*/ + public interface RecordQueue { + void put(T record) throws InterruptedException; + + int getQueueId(); + + int getSize(); + + T peek(); + } + + private class AsyncRecordQueue implements RecordQueue { + private final ArrayBlockingQueue queue; + private final int queueId; + long headTimestamp; + + private AsyncRecordQueue(int queueId) { + super(); + this.queue = new ArrayBlockingQueue<>(queueCapacity); + this.queueId = queueId; + this.headTimestamp = Long.MAX_VALUE; + } + + public void put(T record) throws InterruptedException { + queue.put(record); + // TODO: not pretty having this here + synchronized (condition) { + condition.notify(); + } + } + + public int getQueueId() { + return queueId; + } + + public int getSize() { + return queue.size(); + } + + public T peek() { + return queue.peek(); + } + + } + + /** +* The queue for the given producer (i.e. Kinesis shard consumer thread). +* +