[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 hi @tillrohrmann @mproch I have updated the PR, including a wrapper class for `RuntimeContext` specialized for `RichAsyncFunction`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r92536062 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; + +/** + * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * + * {@link RichAsyncFunction#getRuntimeContext()} and {@link RichAsyncFunction#getRuntimeContext()} are + * not supported because the key may get changed while accessing states in the working thread. + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ + +@PublicEvolving +public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction + implements AsyncFunction<IN, OUT> { + + @Override + public abstract void asyncInvoke(IN input, AsyncCollector collector) throws Exception; + + @Override + public RuntimeContext getRuntimeContext() { --- End diff -- I agree. It will not miss anything except state access :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r92315209 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; + +/** + * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * + * {@link RichAsyncFunction#getRuntimeContext()} and {@link RichAsyncFunction#getRuntimeContext()} are + * not supported because the key may get changed while accessing states in the working thread. + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ + +@PublicEvolving +public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction + implements AsyncFunction<IN, OUT> { + + @Override + public abstract void asyncInvoke(IN input, AsyncCollector collector) throws Exception; + + @Override + public RuntimeContext getRuntimeContext() { --- End diff -- Personally, I prefer the second option... It is handy for the user codes, even though the limitations present. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r92314677 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; + +/** + * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * + * {@link RichAsyncFunction#getRuntimeContext()} and {@link RichAsyncFunction#getRuntimeContext()} are + * not supported because the key may get changed while accessing states in the working thread. + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ + +@PublicEvolving +public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction + implements AsyncFunction<IN, OUT> { + + @Override + public abstract void asyncInvoke(IN input, AsyncCollector collector) throws Exception; + + @Override + public RuntimeContext getRuntimeContext() { --- End diff -- Hi @mproch, In our use case in the production env, we provide some metrics in the `AsyncWaitOperator`, like the number of elements in the `AsyncCollectorBuffer` etc. Maybe the `AsyncWaitOperator` provides those basic statistics metrics. The `RichAsyncFunction` is useful for its `open()` and `close()` methods, where the async clients can be initialized and freed. I think there are two options: 1. Remove `RichAsyncFunction`, since `AsyncWaitOperator` currently only supports non-keyed stream. 2. Keep `RichAsyncFunction` but it is different to use `RichAsyncFunction.getRuntimeContext()`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann . I have updated the PR. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann Thanks for your help. I am still working on it, adding more test cases and modifying its internal data structure. It almost has been done. But due to other jobs, I can not work on it in these days. I think I can update the PR probably on Wednesday after fixing UT failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89741541 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +@Internal +public class AsyncWaitOperator<IN, OUT> + extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT> +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + private final static String STATE_NAME = "_async_wait_operator_state_"; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient StreamElementSerializer inStreamElementSerializer; + + /** +* input stream elements from the state +*/ + private transient ListState recoveredStreamElements; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer<IN, OUT> buffer; + + /** +* Checkpoint lock from {@link StreamTask#lock} +*/ + private transient Object checkpointLock; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + /** +* For test only. Normally this flag is true, indicating that the Emitter Thread +* in the buffer will work. +*/ + private boolean emitFlag = true; + + /** +* Test serializer used in unit test +*/ + private StreamElementSerializer inStreamElementSerializerForTest; + + + public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setOutputMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + @VisibleForTesting + public void setEmitFlag(boolean emitFlag) { + this.emitFlag = emitFlag; + } +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89431529 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output<StreamRecord> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output<StreamRecord> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89424283 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector<IN, OUT> { + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer<IN, OUT> buffer; + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) { + Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null"); + + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; --- End diff -- Actually, they work as a stub here. I think we can optimize those `AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the code in `nothingToDo()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89294937 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output<StreamRecord> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output<StreamRecord> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89293393 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output<StreamRecord> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output<StreamRecord> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89291235 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output<StreamRecord> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output<StreamRecord> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89289895 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output<StreamRecord> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output<StreamRecord> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89285857 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; --- End diff -- Placing `AsyncCollectorBuffer` in `functions` package seems not very nice. I create a buffer sub-package under `async` and place it here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann . I have updated the code again, adding the Emitter Thread, using the latest OperatorStateStore, updating the test codes. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86916022 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction<Integer, Integer> function; + + private AsyncWaitOperator<Integer, Integer> operator; + + private AsyncCollectorBuffer<Integer, Integer> buffer; + + private Output<StreamRecord> output; + + @Before + public void setUp() throws Exception { + function = new AsyncFunction<Integer, Integer>() { + @Override + public void asyncInvoke(Integer input, AsyncCollector<Integer, Integer> collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + TimestampedCollector collector =new TimestampedCollector(output); + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator); + buffer.setOutput(collector, output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + Thread.sleep(1000); + buffer.add(new Watermark(0l)); + buffer.add(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(((SimpleLinkedList) Whitebox.getInternalState(buffer, "queue")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToStreamElement")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer,
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86914170 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Maybe this is a solution. We will not think about changing the type of `checkpointLock`, which is an `Object` ans quite efficient, and not change the order of `broadcastBarriers` and `operator.snapshotState()`. By placing **pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: ` private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); synchronized (lock) { if (isRunning) { // stop working threads first. for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator instanceof AsyncWaitOperator) { operator.pauseEmitterThread(); } } // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() ); checkpointState(checkpointMetaData); return true; } else { return false; } } }` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86709933 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- Can you share the test program and test result? I write an sample program to test the performance to acquire and release lock with `Object` / `ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use the lock, they have similar results. But referring to multiple thread cases, `ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock. Here is my benchmark program: [link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java) And this is the sampled results run on my laptop: [stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Good point ;D Emm, I think we have to override the `StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that once the `TaskManager` notifies the `Task` that checkpoint has completed, the `EmitterThread` can start working as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706353 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction<Integer, Integer> function; + + private AsyncWaitOperator<Integer, Integer> operator; + + private AsyncCollectorBuffer<Integer, Integer> buffer; + + private Output<StreamRecord> output; + + @Before + public void setUp() throws Exception { + function = new AsyncFunction<Integer, Integer>() { + @Override + public void asyncInvoke(Integer input, AsyncCollector<Integer, Integer> collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + TimestampedCollector collector =new TimestampedCollector(output); + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator); + buffer.setOutput(collector, output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + Thread.sleep(1000); + buffer.add(new Watermark(0l)); + buffer.add(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(((SimpleLinkedList) Whitebox.getInternalState(buffer, "queue")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToStreamElement")).size(), 2); + Assert.assertEquals(((Map) Whitebox.getInternalState(buffer,
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706002 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 The case may happen when all elements have arrived. For this case, `StreamTask` will go to close all operators, which will reach `AsyncCollectorBuffer.waitEmpty`. In this function, `Emitter Thread` will be waken up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86527618 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- Disabling chain is an option, but it is a little strict to force it. Users may try to avoid network overhead by chaining them together. I think it is worth to have a try to introduce `ReentrantReadWriteLock`. Maybe I can have a test between `Object.lock()` and `ReentrantReadWriteLock`, to get the performance difference while locking and unlock for many times. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 For exactly once processing guarantees, when `AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, `isCheckpointing` will be set to true, while will idle `Emitter Thread` while checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, all `EmitterThread` have stopped there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 For exactly once processing guarantees, when `AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, `isCheckpointing` will be set to true, while will idle `Emitter Thread` while checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, all `EmitterThread` have stopped there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 For exactly once processing guarantees, when `AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, `isCheckpointing` will be set to true, while will idle `Emitter Thread` while checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, all `EmitterThread` have stopped there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86520537 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java --- @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws Exception { env.execute(); } + @Test + public void testAsyncWaitOperator() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1); + + AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() { + transient ExecutorService executorService; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + executorService = Executors.newFixedThreadPool(numElements); + } + + @Override + public void close() throws Exception { + super.close(); + executorService.shutdown(); + } + + @Override + public void asyncInvoke(final Tuple2<Integer, NonSerializable> input, + final AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws Exception { + this.executorService.submit(new Runnable() { + @Override + public void run() { + // wait for while to simulate async operation here + int sleep = (int) (new Random().nextFloat() * 1000); + try { + Thread.sleep(sleep); + List ret = new ArrayList<>(); + ret.add(input.f0+input.f0); + collector.collect(ret); + } + catch (InterruptedException e) { + collector.collect(new ArrayList(0)); + } + } + }); + } + }; + + DataStream orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1); + orderedResult.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE).setParallelism(1); + + DataStream unorderedResult = AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1); + unorderedResult.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); --- End diff -- I think it is determined by `StreamingMultipleProgramsTestBase`, which requires tmp files from test job's output to verify the result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86518429 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction<Integer, Integer> function; + + private AsyncWaitOperator<Integer, Integer> operator; + + private AsyncCollectorBuffer<Integer, Integer> buffer; + + private Output<StreamRecord> output; + + @Before + public void setUp() throws Exception { + function = new AsyncFunction<Integer, Integer>() { + @Override + public void asyncInvoke(Integer input, AsyncCollector<Integer, Integer> collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + TimestampedCollector collector =new TimestampedCollector(output); + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator); + buffer.setOutput(collector, output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + Thread.sleep(1000); --- End diff -- It is not necessary, I will remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86517940 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- Yes. We have discussed this problem, and the solution will make sure that we will not change the order of broadcasting barriers and then snapshotting. The inelegant way is to stop all `Emitter` Thread first before broadcasting barrier. But it is a little tricky. Another way is to change checkpoint lock into ReentrantReadWriteLock. For main thread and emitter thread, they have to acquire read lock. But for checkpoint thread or checkpoint procedure, write lock should be taken first, so that all emitter threads also stop working. In this way, main thread and all emitter thread will not block each other. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86517044 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Internal +public class AsyncWaitOperator<IN, OUT> + extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT> +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient TypeSerializer inTypeSerializer; + private transient DataOutputSerializer outputSerializer; + + /** +* input stream elements from the state +*/ + private transient List inputsFromState; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer<IN, OUT> buffer; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + public void init() { + this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, this); + this.collector = new TimestampedCollector<>(output); + this.buffer.setOutput(collector, output); + + this.outputSerializer = new DataOutputSerializer(128); + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output<StreamRecord> output) { + super.setup(containingTask, config, output); + + this.inTypeSerializer = this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()); + + init(); + } + + @Override + public void open() throws Exception { + super.open(); + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86514286 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Internal +public class AsyncWaitOperator<IN, OUT> + extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT> +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient TypeSerializer inTypeSerializer; + private transient DataOutputSerializer outputSerializer; + + /** +* input stream elements from the state +*/ + private transient List inputsFromState; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer<IN, OUT> buffer; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + public void init() { + this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, this); + this.collector = new TimestampedCollector<>(output); + this.buffer.setOutput(collector, output); + + this.outputSerializer = new DataOutputSerializer(128); + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output<StreamRecord> output) { + super.setup(containingTask, config, output); + + this.inTypeSerializer = this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()); + + init(); + } + + @Override + public void open() throws Exception { + super.open(); + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86514152 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Internal +public class AsyncWaitOperator<IN, OUT> + extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT> +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient TypeSerializer inTypeSerializer; + private transient DataOutputSerializer outputSerializer; + + /** +* input stream elements from the state +*/ + private transient List inputsFromState; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer<IN, OUT> buffer; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + public void init() { + this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, this); + this.collector = new TimestampedCollector<>(output); + this.buffer.setOutput(collector, output); + + this.outputSerializer = new DataOutputSerializer(128); + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output<StreamRecord> output) { + super.setup(containingTask, config, output); + + this.inTypeSerializer = this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()); + + init(); + } + + @Override + public void open() throws Exception { + super.open(); + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86513230 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86511814 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86511685 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r8650 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86508413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86508318 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86507829 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86507678 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private TimestampedCollector timestampedCollector; + private Output<StreamRecord> output; + + /** +* Locks and conditions to synchronize with main thread and emitter thread. +*/ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** +* Error from user codes. +*/ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newConditi
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86507065 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** +* A hash map keeping {@link AsyncCollector} and their node references in the #queue. +*/ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList finishedCollectors = new LinkedList<>(); --- End diff -- I changed into ArrayDeque, which is faster than LinkedList while visiting all elements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86506096 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** +* A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} +*/ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); --- End diff -- `collectorToStreamElement` is removed. Actually, `AsyncCollector` acts like an internal object, and should not be overwritten. So the behavior for `AsyncCollector` in `HashMap` and `IdentityHashMap` are the same here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86505374 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** +* {@link AsyncCollector} queue. +*/ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); --- End diff -- Cool! I have thought about `LinkedHashMap`, this class meets our requirements perfectly. The following two maps, `collectorToStreamElement` and the `collectorToQueue`, are also can be deleted, so does `SimpleLinkedList` :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86493542 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector<IN, OUT> { + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer<IN, OUT> buffer; + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) { + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; + } + + /** +* Set result +* @param result A list of results. +*/ + public void collect(List result) { --- End diff -- Yes, only for one time. I will update the code doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86493343 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector<IN, OUT> { + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer<IN, OUT> buffer; + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) { + this.buffer = buffer; --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86493182 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { + public enum OutputMode { ORDERED, UNORDERED } + + private static final int DEFAULT_BUFFER_SIZE = 100; + + private static <IN, OUT> SingleOutputStreamOperator addOperator(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize, OutputMode mode) { + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType((Function) func, AsyncFunction.class, false, + true, in.getType(), Utils.getCallLocationName(), true); + + // create transform + AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func)); + operator.setBufferSize(bufSize); + operator.setMode(mode); + + OneInputTransformation<IN, OUT> resultTransform = new OneInputTransformation<>( + in.getTransformation(), + "async wait operator", + operator, + outTypeInfo, + in.getExecutionEnvironment().getParallelism()); + + SingleOutputStreamOperator returnStream = + new SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform); + + returnStream.getExecutionEnvironment().addOperator(resultTransform); + + return returnStream; + } + + /** +* Add an AsyncWaitOperator. The order of output stream records may be reordered. +* +* @param in Input {@link DataStream} +* @param func AsyncFunction +* @bufSize The max number of async i/o operation that can be triggered +* @return A new {@link SingleOutputStreamOperator}. +*/ + public static <IN, OUT> SingleOutputStreamOperator unorderedWait(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize) { + return addOperator(in, func, bufSize, OutputMode.UNORDERED); + } + + public static <IN, OUT> SingleOutputStreamOperator unorderedWait(DataStream in, + AsyncFunction<
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86493149 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { + public enum OutputMode { ORDERED, UNORDERED } + + private static final int DEFAULT_BUFFER_SIZE = 100; + + private static <IN, OUT> SingleOutputStreamOperator addOperator(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize, OutputMode mode) { + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType((Function) func, AsyncFunction.class, false, + true, in.getType(), Utils.getCallLocationName(), true); + + // create transform + AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func)); + operator.setBufferSize(bufSize); + operator.setMode(mode); + + OneInputTransformation<IN, OUT> resultTransform = new OneInputTransformation<>( + in.getTransformation(), + "async wait operator", + operator, + outTypeInfo, + in.getExecutionEnvironment().getParallelism()); + + SingleOutputStreamOperator returnStream = + new SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform); + + returnStream.getExecutionEnvironment().addOperator(resultTransform); + + return returnStream; + } + + /** +* Add an AsyncWaitOperator. The order of output stream records may be reordered. +* +* @param in Input {@link DataStream} +* @param func AsyncFunction +* @bufSize The max number of async i/o operation that can be triggered +* @return A new {@link SingleOutputStreamOperator}. +*/ + public static <IN, OUT> SingleOutputStreamOperator unorderedWait(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize) { + return addOperator(in, func, bufSize, OutputMode.UNORDERED); + } + + public static <IN, OUT> SingleOutputStreamOperator unorderedWait(DataStream in, --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86492545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { + public enum OutputMode { ORDERED, UNORDERED } + + private static final int DEFAULT_BUFFER_SIZE = 100; + + private static <IN, OUT> SingleOutputStreamOperator addOperator(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize, OutputMode mode) { + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType((Function) func, AsyncFunction.class, false, + true, in.getType(), Utils.getCallLocationName(), true); + + // create transform + AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func)); + operator.setBufferSize(bufSize); + operator.setMode(mode); --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86492478 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { + public enum OutputMode { ORDERED, UNORDERED } + + private static final int DEFAULT_BUFFER_SIZE = 100; + + private static <IN, OUT> SingleOutputStreamOperator addOperator(DataStream in, + AsyncFunction<IN, OUT> func, + int bufSize, OutputMode mode) { --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86492216 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86486644 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86486482 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86486085 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { --- End diff -- Emm, I will change it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86486055 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86484584 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); + String
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86482845 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); --- End diff -- It is optional here ;D. I will remove it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Hi @tillrohrmann , Thanks for your review ;D I will check through each of your comments and update the PR later. Coming to the first part of review, the first one is about `UNORDERED` mode against `Watermark`. This combination is meaningless, of course. Maybe an error can be printed out and the graph generator stops compiling the graph if `UNORDERED` mode and `Watermark` are enabled at the same time? Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing checkpoint for the chained operator and making the snapshot for the `AsyncWaitOperator`, it will first try to get all elements in the `AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try to get the lock first to block `Emitter` thread and set a flag named `isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector` will not be transferred to the next operator. Calling the `snapshotState()` method is from the head operator to the tail operator, making sure that all states can be taken correctly since `Emitter` threads in parent operators have stopped working. I used to consider about using checkpoint lock in `Emitter` thread, but after testing with the case chaining multiple `AsyncWaitOperator` together, all `Emitter` thread can not fully utilize the the parallelism since they have to get the same lock while collecting outputs. One way to optimize this is to put a conditional statement at `performCheckpoint()`, if there is an `AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers later after `checkpointState()`, otherwise, we can use original design. At last, I will add more test cases based on the `OneInputStreamTaskTestHarness`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...
Github user bjlovegithub commented on the issue: https://github.com/apache/flink/pull/2629 Update the PR based on the latest review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r83573944 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; + +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector<IN, OUT> { + private List result; + private Throwable error; + + private boolean isDone = false; + + private AsyncCollectorBuffer<IN, OUT> buffer; + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) { + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; + } + + /** +* Set result +* @param result A list of results. +*/ + public void collect(List result) { + this.result = result; + isDone = true; + buffer.mark(this); + } + + /** +* Set error +* @param error A Throwable object. +*/ + public void collect(Throwable error) { + this.error = error; + isDone = true; + buffer.mark(this); + } + + /** +* Get result. Throw RuntimeException while encountering an error. +* +* @return A List of result. +* @throws RuntimeException RuntimeException wrapping errors from user codes. +*/ + public List getResult() throws RuntimeException { --- End diff -- That makes me clarify about how to use RuntimeException ;D I prefer using IOException, since the error is from I/O process. I will change RuntimeException into IOException in the codes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r83573419 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + Thread.sleep(10); + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); + String timeType = args[7]; + + //
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r83571326 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + Thread.sleep(10); --- End diff -- That is right. It will block checkpoint for extra 0.1 second. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
GitHub user bjlovegithub opened a pull request: https://github.com/apache/flink/pull/2629 [FLINK-4391] Provide support for asynchronous operations over streams PR for [FLIP 12](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673) - Implementation for Async I/O in FLINK. Basically, the newly added operator supports async operation while streaming. By implementing AsyncFunction, we can easily do i/o access, like fetching data from HBase, in async way. Please refer to AsyncIOExample.java about how to use it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bjlovegithub/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2629.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2629 commit f352d0a8b879164baa8e4922f63ad8dde13c7c9b Author: yushi.wxg <yushi@taobao.com> Date: 2016-10-12T06:31:43Z Add async wait operator commit 139902396e1c30d88671ac67a54afea145ab7651 Author: yushi.wxg <yushi@taobao.com> Date: 2016-10-13T10:13:32Z 1. add an example job 2. fix a bug in state serialization in async wait operator; 3. move broadcast barrier after snapshot operator states commit fc41eedcd48d2dd7d940e744dec86a51b24e4ac5 Author: yushi.wxg <yushi@taobao.com> Date: 2016-10-13T10:25:36Z update IT case commit 9855f071f04b6efcfc4e10a9f6ef1c38ae264637 Author: yushi.wxg <yushi@taobao.com> Date: 2016-10-13T10:28:39Z adjust the whitespace in IT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---