[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-21 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2570


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84255151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251918
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251732
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I'm renaming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84251996
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84235862
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
--- End diff --

I agree with @StefanRRichter 's comment below and I just have to add that 
for `processWatermark1` and `processWatermark2` much of the code is repeated so 
the common part can become a private method that is called by both these 
methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237685
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

We could also rename it to SystemProcessingTimeService or sth more 
indicative of where clock tics come from.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84236659
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84237459
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, ProcessingTimeCallback {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
--- End diff --

I am wondering if it would be better to rename this to `EventTimeTimers`. 
This plays well with `processingTimeTimers` and also it indicates what we are 
talking about. We have event time whose "clock-tiks" are the Watermarks and 
processing time whose clock tiks are the wall clock ones. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84039288
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
--- End diff --

That one's a bit tricky since I only get the `TypeSerializers` once the 
user requests a 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84038623
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
--- End diff --

I can replace it by this but not sure if it's more readable:
```
@SuppressWarnings("unchecked")
HeapInternalTimerService.RestoredTimers restoredService =
  restoredServices == null ? null : 
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);

if (restoredService != null) {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84038706
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
+   if (newMin > combinedWatermark) {
+   combinedWatermark = newMin;
+   processWatermark(new Watermark(combinedWatermark));
+   }
+   }
+
+   public void processWatermark2(Watermark mark) throws Exception {
--- End diff --

Jip, I'm already trying to address that in another PR for changing the 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84036274
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be 
scoped.
+ *
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService {
+
+   /** Returns the current processing time. */
+   long currentProcessingTime();
+
+   /** Returns the current event time. */
+   long currentWatermark();
--- End diff --

Jip, will rename everything to `currentWatermark()` because that's also 
what we already have in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84035970
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

I had it initially with an initialisation method that would get the 
`TimerService` but then went for this version because users don't have to have 
an extra field and store the timer service if they don't use a `RichFunction`.

Another alternative would be to put all the parameters (timestamp, time 
domain, timer service) into something like a context parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84035592
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+ 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84031809
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+ 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84031706
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
 ---
@@ -15,38 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.streaming.api.operators;
 
-package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.annotation.Internal;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends TimeServiceProvider {
-
-   private volatile boolean terminated;
-
-   @Override
-   public long getCurrentProcessingTime() {
-   return System.currentTimeMillis();
-   }
-
-   @Override
-   public ScheduledFuture registerTimer(long timestamp, Triggerable 
target) {
-   return null;
-   }
-
-   @Override
-   public boolean isTerminated() {
-   return terminated;
-   }
-
-   @Override
-   public void quiesceAndAwaitPending() {}
-
-   @Override
-   public void shutdownService() {
-   terminated = true;
-   }
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable {
--- End diff --

Done, changed it to `ProcessingTimeCallback`, which has a method 
`onProcessingTime()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83854971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
 ---
@@ -15,38 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.streaming.api.operators;
 
-package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.annotation.Internal;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends TimeServiceProvider {
-
-   private volatile boolean terminated;
-
-   @Override
-   public long getCurrentProcessingTime() {
-   return System.currentTimeMillis();
-   }
-
-   @Override
-   public ScheduledFuture registerTimer(long timestamp, Triggerable 
target) {
-   return null;
-   }
-
-   @Override
-   public boolean isTerminated() {
-   return terminated;
-   }
-
-   @Override
-   public void quiesceAndAwaitPending() {}
-
-   @Override
-   public void shutdownService() {
-   terminated = true;
-   }
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable {
--- End diff --

I found this name a bit confusing, it could lead to confusion with the 
concept of Triggers. Furthermore, there is another class with the same simple 
name in a different package. Maybe this could be called `TimerCallback`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83862277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
+   if (newMin > combinedWatermark) {
+   combinedWatermark = newMin;
+   processWatermark(new Watermark(combinedWatermark));
+   }
+   }
+
+   public void processWatermark2(Watermark mark) throws Exception {
--- End diff --

As a general comment, somehow I don't like how two cases (one and 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83861268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
--- End diff --

`contains()` + `remove()` seems a bit redundant for this use. I would just 
always remove and check the return value for null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Internal class for keeping track of in-flight timers.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimer implements Comparable> {
+   private final long timestamp;
+   private final K key;
+   private final N namespace;
+
+   public InternalTimer(long timestamp, K key, N namespace) {
+   this.timestamp = timestamp;
+   this.key = key;
+   this.namespace = namespace;
+   }
+
+   public long getTimestamp() {
+   return timestamp;
+   }
+
+   public K getKey() {
+   return key;
+   }
+
+   public N getNamespace() {
+   return namespace;
+   }
+
+   @Override
+   public int compareTo(InternalTimer o) {
+   return Long.compare(this.timestamp, o.timestamp);
--- End diff --

Method compareTo is not fully aligned with equals, which is acceptable but 
strongly advised against by the documentation of `Comparable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83857663
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

I wonder if `TimeDomain` and `TimerService` should be parameter to methods 
in this interface. I assume both remain stable for the lifetime of the UDF and 
could be passed once in some init method that can also be preimplemented in a 
`RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I 
like to keep the number of parameters small when possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860194
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83856028
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+   

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83859446
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
--- End diff --

RestoredTimers are serialized with their typeserializers. It could make 
sense to have some 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be 
scoped.
+ *
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService {
+
+   /** Returns the current processing time. */
+   long currentProcessingTime();
+
+   /** Returns the current event time. */
+   long currentWatermark();
--- End diff --

The corresponding method in the public interface is called 
`currentEventTime`. Does it makes sense to keep both method names synchronous? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860732
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 ---
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+   private static InternalTimer anyInternalTimer() {
+   return any();
+   }
+
+   /**
+* Verify that we only ever have one processing-time task registered at 
the
+* {@link ProcessingTimeService}.
+*/
+   @Test
+   public void testOnlySetsOnePhysicalProcessingTimeTimer() throws 
Exception {
+   @SuppressWarnings("unchecked")
+   Triggerable mockTriggerable = 
mock(Triggerable.class);
+
+   TestKeyContext keyContext = new TestKeyContext();
+
+   TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+   HeapInternalTimerService timerService =
+   createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+   keyContext.setCurrentKey(0);
+
+   timerService.registerProcessingTimeTimer("ciao", 10);
+   timerService.registerProcessingTimeTimer("ciao", 20);
+   timerService.registerProcessingTimeTimer("ciao", 30);
+   timerService.registerProcessingTimeTimer("hello", 10);
+   timerService.registerProcessingTimeTimer("hello", 20);
+
+   assertEquals(5, timerService.numProcessingTimeTimers());
+   assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+
+   processingTimeService.setCurrentTime(10);
+
+   assertEquals(3, timerService.numProcessingTimeTimers());
+   assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+   processingTimeService.setCurrentTime(20);
+
+   assertEquals(1, timerService.numProcessingTimeTimers());
+   assertEquals(0, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83855317
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+   

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-09-29 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2570

[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
`WindowOperator` behind a well defined interface that can be used by
operators (and user functions).

`InternalTimerService` is the new interface that has the same
functionality that `WindowOperator` used to have. `TimerService` is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in `HeapInternalTimerService` that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
`AbstractStreamOperator` and operators can ask for an
`InternalTimerService`.

This also adds tests for `HeapInternalTimerService`.

This adds two new user functions:
 - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also
   allows querying time and setting timers
 - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction`

There are two new `StreamOperator` implementations for these that use the
`InternalTimerService` interface.

This also adds tests for the two new operators.

This also adds the new interface `KeyContext` that is used for
setting/querying the current key context for state and timers. Timers
are always scoped to a key, for now.

Also, this moves the handling of watermarks for both one-input and
two-input operators to `AbstractStreamOperators` so that we have a central
ground-truth.

There was also a bunch of small changes that I had to do to make the proper 
change more clean. I would like to keep these as separate commits because they 
clearly document what was going on.

## Note for Reviewers
You should probably start from the tests, i.e. 
`HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. 
Then, the other interesting bits are `AbstractStreamOperator` that now deals 
with watermarks and checkpointing the timers and the `HeapInternalTimerService` 
as well. Keep in mind that this is just moving the code from `WindowOperator` 
to `HeapInternalTimerService` with some generalizations. I didn't try to 
optimize any of the data structures that are used.

R: @StephanEwen @StefanRRichter @kl0u for review, please 😃 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink timely-function

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2570


commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek 
Date:   2016-09-25T18:58:16Z

Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvided is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.

commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek 
Date:   2016-09-28T13:10:35Z

Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek 
Date:   2016-09-28T14:35:33Z

Use Processing-Time Service of TestHarness in WindowOperatorTest

Before, this was manually creating a TestProcessingTimeService, now,
we're using the one that is there by default in
OneInputStreamOperatorTestHarness.

commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek 
Date:   2016-09-28T14:43:40Z

Refactor OperatorTestHarness to always use TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.

commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:12:26Z

Use TestHarness Processing-time Facility in BucketingSinkTest

Before, this was manually creating a TestProcessingTimeService. Now we
use the one that is there by default in
OneInputStreamOperatorTestHarness.

commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek 
Date:   2016-09-28T13:32:24Z

Use OperatorTestHarness in AlignedWindowOperator Tests

commit b597d2ef50c27554b83fddaff8873107265340d4