[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2570 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84255151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251918 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue =
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251732 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I'm renaming --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue =
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84251996 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue =
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84235862 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + publicInternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService ) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers ) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); --- End diff -- I agree with @StefanRRichter 's comment below and I just have to add that for `processWatermark1` and `processWatermark2` much of the code is repeated so the common part can become a private method that is called by both these methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237685 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java --- @@ -35,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { +public class DefaultProcessingTimeService extends ProcessingTimeService { --- End diff -- We could also rename it to SystemProcessingTimeService or sth more indicative of where clock tics come from. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84236659 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue =
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84237459 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, ProcessingTimeCallback { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); --- End diff -- I am wondering if it would be better to rename this to `EventTimeTimers`. This plays well with `processingTimeTimers` and also it indicates what we are talking about. We have event time whose "clock-tiks" are the Watermarks and processing time whose clock tiks are the wall clock ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84039288 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + --- End diff -- That one's a bit tricky since I only get the `TypeSerializers` once the user requests a
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84038623 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + publicInternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService ) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = --- End diff -- I can replace it by this but not sure if it's more readable: ``` @SuppressWarnings("unchecked") HeapInternalTimerService.RestoredTimers restoredService = restoredServices == null ? null : (HeapInternalTimerService.RestoredTimers ) restoredServices.remove(name); if (restoredService != null) { ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84038706 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + publicInternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService ) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers ) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); + if (newMin > combinedWatermark) { + combinedWatermark = newMin; + processWatermark(new Watermark(combinedWatermark)); + } + } + + public void processWatermark2(Watermark mark) throws Exception { --- End diff -- Jip, I'm already trying to address that in another PR for changing the
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84036274 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for working with time and timers. + * + * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} + * that allows to specify a key and a namespace to which timers should be scoped. + * + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface InternalTimerService { + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event time. */ + long currentWatermark(); --- End diff -- Jip, will rename everything to `currentWatermark()` because that's also what we already have in other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84035970 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- I had it initially with an initialisation method that would get the `TimerService` but then went for this version because users don't have to have an extra field and store the timer service if they don't use a `RichFunction`. Another alternative would be to put all the parameters (timestamp, time domain, timer service) into something like a context parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84035592 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMapoperator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); +
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84031809 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMapoperator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); +
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84031706 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java --- @@ -15,38 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.streaming.api.operators; -package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import java.util.concurrent.ScheduledFuture; - -class NoOpTimerService extends TimeServiceProvider { - - private volatile boolean terminated; - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture registerTimer(long timestamp, Triggerable target) { - return null; - } - - @Override - public boolean isTerminated() { - return terminated; - } - - @Override - public void quiesceAndAwaitPending() {} - - @Override - public void shutdownService() { - terminated = true; - } +/** + * Interface for things that can be called by {@link InternalTimerService}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface Triggerable{ --- End diff -- Done, changed it to `ProcessingTimeCallback`, which has a method `onProcessingTime()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83854971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java --- @@ -15,38 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.streaming.api.operators; -package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import java.util.concurrent.ScheduledFuture; - -class NoOpTimerService extends TimeServiceProvider { - - private volatile boolean terminated; - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture registerTimer(long timestamp, Triggerable target) { - return null; - } - - @Override - public boolean isTerminated() { - return terminated; - } - - @Override - public void quiesceAndAwaitPending() {} - - @Override - public void shutdownService() { - terminated = true; - } +/** + * Interface for things that can be called by {@link InternalTimerService}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface Triggerable{ --- End diff -- I found this name a bit confusing, it could lead to confusion with the concept of Triggers. Furthermore, there is another class with the same simple name in a different package. Maybe this could be called `TimerCallback`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83862277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + publicInternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService ) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers ) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); + if (newMin > combinedWatermark) { + combinedWatermark = newMin; + processWatermark(new Watermark(combinedWatermark)); + } + } + + public void processWatermark2(Watermark mark) throws Exception { --- End diff -- As a general comment, somehow I don't like how two cases (one and
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83861268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + publicInternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService ) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = --- End diff -- `contains()` + `remove()` seems a bit redundant for this use. I would just always remove and check the return value for null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Internal class for keeping track of in-flight timers. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public class InternalTimerimplements Comparable > { + private final long timestamp; + private final K key; + private final N namespace; + + public InternalTimer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + } + + public long getTimestamp() { + return timestamp; + } + + public K getKey() { + return key; + } + + public N getNamespace() { + return namespace; + } + + @Override + public int compareTo(InternalTimer o) { + return Long.compare(this.timestamp, o.timestamp); --- End diff -- Method compareTo is not fully aligned with equals, which is acceptable but strongly advised against by the documentation of `Comparable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83857663 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- I wonder if `TimeDomain` and `TimerService` should be parameter to methods in this interface. I assume both remain stable for the lifetime of the UDF and could be passed once in some init method that can also be preimplemented in a `RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I like to keep the number of parameters small when possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860194 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue =
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83856028 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMapoperator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); +
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83859446 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerServiceimplements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue > processingTimeTimersQueue; + private final Set > processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set > watermarkTimers; + private final PriorityQueue > watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + --- End diff -- RestoredTimers are serialized with their typeserializers. It could make sense to have some
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858860 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for working with time and timers. + * + * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} + * that allows to specify a key and a namespace to which timers should be scoped. + * + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface InternalTimerService { + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event time. */ + long currentWatermark(); --- End diff -- The corresponding method in the public interface is called `currentEventTime`. Does it makes sense to keep both method names synchronous? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860732 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java --- @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link HeapInternalTimerService}. + */ +public class HeapInternalTimerServiceTest { + + private static InternalTimeranyInternalTimer() { + return any(); + } + + /** +* Verify that we only ever have one processing-time task registered at the +* {@link ProcessingTimeService}. +*/ + @Test + public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { + @SuppressWarnings("unchecked") + Triggerable mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + HeapInternalTimerService timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("ciao", 20); + timerService.registerProcessingTimeTimer("ciao", 30); + timerService.registerProcessingTimeTimer("hello", 10); + timerService.registerProcessingTimeTimer("hello", 20); + + assertEquals(5, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(3, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + + processingTimeService.setCurrentTime(10); + + assertEquals(3, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + processingTimeService.setCurrentTime(20); + + assertEquals(1, timerService.numProcessingTimeTimers()); + assertEquals(0, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1,
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83855317 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMapoperator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); +
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2570 [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from `WindowOperator` behind a well defined interface that can be used by operators (and user functions). `InternalTimerService` is the new interface that has the same functionality that `WindowOperator` used to have. `TimerService` is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in `HeapInternalTimerService` that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in `AbstractStreamOperator` and operators can ask for an `InternalTimerService`. This also adds tests for `HeapInternalTimerService`. This adds two new user functions: - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also allows querying time and setting timers - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction` There are two new `StreamOperator` implementations for these that use the `InternalTimerService` interface. This also adds tests for the two new operators. This also adds the new interface `KeyContext` that is used for setting/querying the current key context for state and timers. Timers are always scoped to a key, for now. Also, this moves the handling of watermarks for both one-input and two-input operators to `AbstractStreamOperators` so that we have a central ground-truth. There was also a bunch of small changes that I had to do to make the proper change more clean. I would like to keep these as separate commits because they clearly document what was going on. ## Note for Reviewers You should probably start from the tests, i.e. `HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. Then, the other interesting bits are `AbstractStreamOperator` that now deals with watermarks and checkpointing the timers and the `HeapInternalTimerService` as well. Keep in mind that this is just moving the code from `WindowOperator` to `HeapInternalTimerService` with some generalizations. I didn't try to optimize any of the data structures that are used. R: @StephanEwen @StefanRRichter @kl0u for review, please ð You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink timely-function Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2570 commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha KrettekDate: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4