[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] 
Shard watermark synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#discussion_r287840233
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param 
+ */
+public abstract class RecordEmitter implements 
Runnable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RecordEmitter.class);
+
+   /**
+* The default capacity of a single queue.
+*
+* Larger queue size can lead to higher throughput, but also to
+* very high heap space consumption, depending on the size of elements.
+*
+* Note that this is difficult to tune, because it does not take 
into account
+* the size of individual objects.
+*/
+   public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+   private final int queueCapacity;
+   private final ConcurrentHashMap> queues = 
new ConcurrentHashMap<>();
+   private final ConcurrentHashMap, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+   private final PriorityQueue> heads = new 
PriorityQueue<>(this::compareHeadElement);
+   private volatile boolean running = true;
+   private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+   private long maxLookaheadMillis = 60 * 1000; // one minute
+   private long idleSleepMillis = 100;
+   private final Object condition = new Object();
+
+   public RecordEmitter(int queueCapacity) {
+   this.queueCapacity = queueCapacity;
+   }
+
+   private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+   return Long.compare(left.headTimestamp, right.headTimestamp);
+   }
+
+   /**
+* Accepts records from readers.
+*
+* @param 
+*/
+   public interface RecordQueue {
+   void put(T record) throws InterruptedException;
+
+   int getQueueId();
+
+   int getSize();
+
+   T peek();
+   }
+
+   private class AsyncRecordQueue implements RecordQueue {
+   private final ArrayBlockingQueue queue;
+   private final int queueId;
+   long headTimestamp;
+
+   private AsyncRecordQueue(int queueId) {
+   super();
+   this.queue = new ArrayBlockingQueue<>(queueCapacity);
+   this.queueId = queueId;
+   this.headTimestamp = Long.MAX_VALUE;
+   }
+
+   public void put(T record) throws InterruptedException {
+   queue.put(record);
+   // TODO: not pretty having this here
+   synchronized (condition) {
+   condition.notify();
+   }
+   }
+
+   public int getQueueId() {
+   return queueId;
+   }
+
+   public int getSize() {
+   return queue.size();
+   }
+
+   public T peek() {
+   return queue.peek();
+   }
+
+   }
+
+   /**
+* The queue for the given producer (i.e. Kinesis shard consumer 
thread).
+*
+ 

[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] 
Shard watermark synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#discussion_r287840412
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param 
+ */
+public abstract class RecordEmitter implements 
Runnable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RecordEmitter.class);
+
+   /**
+* The default capacity of a single queue.
+*
+* Larger queue size can lead to higher throughput, but also to
+* very high heap space consumption, depending on the size of elements.
+*
+* Note that this is difficult to tune, because it does not take 
into account
+* the size of individual objects.
+*/
+   public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+   private final int queueCapacity;
+   private final ConcurrentHashMap> queues = 
new ConcurrentHashMap<>();
+   private final ConcurrentHashMap, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+   private final PriorityQueue> heads = new 
PriorityQueue<>(this::compareHeadElement);
+   private volatile boolean running = true;
+   private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+   private long maxLookaheadMillis = 60 * 1000; // one minute
+   private long idleSleepMillis = 100;
+   private final Object condition = new Object();
+
+   public RecordEmitter(int queueCapacity) {
+   this.queueCapacity = queueCapacity;
+   }
+
+   private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+   return Long.compare(left.headTimestamp, right.headTimestamp);
+   }
+
+   /**
+* Accepts records from readers.
+*
+* @param 
+*/
+   public interface RecordQueue {
+   void put(T record) throws InterruptedException;
+
+   int getQueueId();
+
+   int getSize();
+
+   T peek();
+   }
+
+   private class AsyncRecordQueue implements RecordQueue {
+   private final ArrayBlockingQueue queue;
+   private final int queueId;
+   long headTimestamp;
+
+   private AsyncRecordQueue(int queueId) {
+   super();
+   this.queue = new ArrayBlockingQueue<>(queueCapacity);
+   this.queueId = queueId;
+   this.headTimestamp = Long.MAX_VALUE;
+   }
+
+   public void put(T record) throws InterruptedException {
+   queue.put(record);
+   // TODO: not pretty having this here
+   synchronized (condition) {
+   condition.notify();
+   }
+   }
+
+   public int getQueueId() {
+   return queueId;
+   }
+
+   public int getSize() {
+   return queue.size();
+   }
+
+   public T peek() {
+   return queue.peek();
+   }
+
+   }
+
+   /**
+* The queue for the given producer (i.e. Kinesis shard consumer 
thread).
+*
+