[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-12-15 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
hi @tillrohrmann @mproch I have updated the PR, including a wrapper class 
for `RuntimeContext` specialized for `RichAsyncFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-12-14 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r92536062
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import 
org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+
+/**
+ * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and 
provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} 
and
+ * {@link RichFunction#close()}.
+ *
+ * 
+ * {@link RichAsyncFunction#getRuntimeContext()} and {@link 
RichAsyncFunction#getRuntimeContext()} are
+ * not supported because the key may get changed while accessing states in 
the working thread.
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+
+@PublicEvolving
+public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction
+   implements AsyncFunction<IN, OUT> {
+
+   @Override
+   public abstract void asyncInvoke(IN input, AsyncCollector 
collector) throws Exception;
+
+   @Override
+   public RuntimeContext getRuntimeContext() {
--- End diff --

I agree. It will not miss anything except state access :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-12-13 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r92315209
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import 
org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+
+/**
+ * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and 
provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} 
and
+ * {@link RichFunction#close()}.
+ *
+ * 
+ * {@link RichAsyncFunction#getRuntimeContext()} and {@link 
RichAsyncFunction#getRuntimeContext()} are
+ * not supported because the key may get changed while accessing states in 
the working thread.
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+
+@PublicEvolving
+public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction
+   implements AsyncFunction<IN, OUT> {
+
+   @Override
+   public abstract void asyncInvoke(IN input, AsyncCollector 
collector) throws Exception;
+
+   @Override
+   public RuntimeContext getRuntimeContext() {
--- End diff --

Personally, I prefer the second option... It is handy for the user codes, 
even though the limitations present.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-12-13 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r92314677
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import 
org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+
+/**
+ * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and 
provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} 
and
+ * {@link RichFunction#close()}.
+ *
+ * 
+ * {@link RichAsyncFunction#getRuntimeContext()} and {@link 
RichAsyncFunction#getRuntimeContext()} are
+ * not supported because the key may get changed while accessing states in 
the working thread.
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+
+@PublicEvolving
+public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction
+   implements AsyncFunction<IN, OUT> {
+
+   @Override
+   public abstract void asyncInvoke(IN input, AsyncCollector 
collector) throws Exception;
+
+   @Override
+   public RuntimeContext getRuntimeContext() {
--- End diff --

Hi @mproch,  In our use case in the production env, we provide some metrics 
in the `AsyncWaitOperator`, like the number of elements in the 
`AsyncCollectorBuffer` etc. Maybe the `AsyncWaitOperator` provides those basic 
statistics metrics.
The `RichAsyncFunction` is useful for its `open()` and `close()` methods, 
where the async clients can be initialized and freed.
I think there are two options:  
1. Remove `RichAsyncFunction`, since `AsyncWaitOperator` currently only 
supports non-keyed stream.
2. Keep `RichAsyncFunction` but it is different to use 
`RichAsyncFunction.getRuntimeContext()`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-12-06 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Hi @tillrohrmann . I have updated the PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-12-05 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Hi @tillrohrmann Thanks for your help. I am still working on it, adding 
more test cases and modifying its internal data structure. It almost has been 
done. But due to other jobs, I can not work on it in these days. I think I can 
update the PR probably on Wednesday after fixing UT failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-28 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89741541
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+@Internal
+public class AsyncWaitOperator<IN, OUT>
+   extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+   implements OneInputStreamOperator<IN, OUT>
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   private final static String STATE_NAME = "_async_wait_operator_state_";
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient StreamElementSerializer inStreamElementSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient ListState recoveredStreamElements;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer<IN, OUT> buffer;
+
+   /**
+* Checkpoint lock from {@link StreamTask#lock}
+*/
+   private transient Object checkpointLock;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   /**
+* For test only. Normally this flag is true, indicating that the 
Emitter Thread
+* in the buffer will work.
+*/
+   private boolean emitFlag = true;
+
+   /**
+* Test serializer used in unit test
+*/
+   private StreamElementSerializer inStreamElementSerializerForTest;
+
+
+   public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setOutputMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   @VisibleForTesting
+   public void setEmitFlag(boolean emitFlag) {
+   this.emitFlag = emitFlag;
+   }
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89431529
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output<StreamRecord> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output<StreamRecord> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+ 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89424283
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector<IN, OUT> {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer<IN, OUT> buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) {
+   Preconditions.checkNotNull(buffer, "Reference to 
AsyncCollectorBuffer should not be null");
+
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
--- End diff --

Actually, they work as a stub here. I think we can optimize those 
`AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the 
code in `nothingToDo()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89294937
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output<StreamRecord> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output<StreamRecord> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+ 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89293393
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output<StreamRecord> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output<StreamRecord> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+ 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89291235
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output<StreamRecord> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output<StreamRecord> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+ 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89289895
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output<StreamRecord> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output<StreamRecord> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+ 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89285857
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
--- End diff --

Placing `AsyncCollectorBuffer` in `functions` package seems not very nice. 
I create a buffer sub-package under `async` and place it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-11 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Hi @tillrohrmann . I have updated the code again, adding the Emitter 
Thread, using the latest OperatorStateStore, updating the test codes. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-07 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86916022
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction<Integer, Integer> function;
+
+   private AsyncWaitOperator<Integer, Integer> operator;
+
+   private AsyncCollectorBuffer<Integer, Integer> buffer;
+
+   private Output<StreamRecord> output;
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction<Integer, Integer>() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector<Integer, Integer> collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   TimestampedCollector collector =new 
TimestampedCollector(output);
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.ORDERED, operator);
+   buffer.setOutput(collector, output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   Thread.sleep(1000);
+   buffer.add(new Watermark(0l));
+   buffer.add(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(((SimpleLinkedList) 
Whitebox.getInternalState(buffer, "queue")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, 
"collectorToStreamElement")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer,

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-07 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86914170
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Maybe this is a solution. We will not think about changing the type of 
`checkpointLock`, which is an `Object` ans quite efficient, and not change the 
order of  `broadcastBarriers` and `operator.snapshotState()`. By placing 
**pause** `EmitterThread` codes in `StreamTask.performCheckpoint()`, like this: 
 

`   private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {

LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());

synchronized (lock) {
if (isRunning) {
// stop working threads first.
for (StreamOperator operator : 
operatorChain.getAllOperators()) {
if (operator instanceof 
AsyncWaitOperator) {
operator.pauseEmitterThread();
}
}

// broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(

checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
);

checkpointState(checkpointMetaData);

return true;
} else {
return false;
}
}
}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86709933
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
 
synchronized (lock) {
if (isRunning) {
+   checkpointState(checkpointMetaData);
 
-   // Since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur.
-   // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
-   // can start their checkpoint work as soon as 
possible
+   // broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(
-   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-   checkpointState(checkpointMetaData);
+   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+   );
--- End diff --

Can you share the test program and test result? I write an sample program 
to test the performance to acquire and release lock with `Object` / 
`ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use 
the lock, they have similar results. But referring to multiple thread cases, 
`ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock.
Here is my benchmark program: 
[link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java)
And this is the sampled results run on my laptop: 
[stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Good point ;D

Emm, I think we have to override the 
`StreamOpeartor.notifyCheckpointComplete()` for `AsyncWaitOpeartor` so that 
once the `TaskManager` notifies the `Task` that checkpoint has completed, the 
`EmitterThread` can start working as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706353
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction<Integer, Integer> function;
+
+   private AsyncWaitOperator<Integer, Integer> operator;
+
+   private AsyncCollectorBuffer<Integer, Integer> buffer;
+
+   private Output<StreamRecord> output;
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction<Integer, Integer>() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector<Integer, Integer> collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   TimestampedCollector collector =new 
TimestampedCollector(output);
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.ORDERED, operator);
+   buffer.setOutput(collector, output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   Thread.sleep(1000);
+   buffer.add(new Watermark(0l));
+   buffer.add(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(((SimpleLinkedList) 
Whitebox.getInternalState(buffer, "queue")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, 
"collectorToStreamElement")).size(), 2);
+   Assert.assertEquals(((Map) Whitebox.getInternalState(buffer,

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-06 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86706002
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
The case may happen when all elements have arrived. For this case, 
`StreamTask` will go to close all operators, which will reach 
`AsyncCollectorBuffer.waitEmpty`. In this function, `Emitter Thread` will be 
waken up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86527618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
 
synchronized (lock) {
if (isRunning) {
+   checkpointState(checkpointMetaData);
 
-   // Since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur.
-   // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
-   // can start their checkpoint work as soon as 
possible
+   // broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(
-   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-   checkpointState(checkpointMetaData);
+   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+   );
--- End diff --

Disabling chain is an option, but it is a little strict to force it. Users 
may try to avoid network overhead by chaining them together.
I think it is worth to have a try to introduce `ReentrantReadWriteLock`. 
Maybe I can have a test between `Object.lock()` and `ReentrantReadWriteLock`, 
to get the performance difference while locking and unlock for many times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
For exactly once processing guarantees,  when 
`AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, 
`isCheckpointing` will be set to true, while will idle `Emitter Thread` while 
checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, 
all `EmitterThread` have stopped there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
For exactly once processing guarantees,  when 
`AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, 
`isCheckpointing` will be set to true, while will idle `Emitter Thread` while 
checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, 
all `EmitterThread` have stopped there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
For exactly once processing guarantees,  when 
`AsyncCollectorBuffer.getStreamElementsInBuffer` has finished, 
`isCheckpointing` will be set to true, while will idle `Emitter Thread` while 
checking `nonthingToDo()`. So, once snapshotting in `StreamTask` is triggered, 
all `EmitterThread` have stopped there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86520537
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 ---
@@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws 
Exception {
env.execute();
}
 
+   @Test
+   public void testAsyncWaitOperator() throws Exception {
+   final int numElements = 10;
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream<Tuple2<Integer, NonSerializable>> input = 
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
+
+   AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> 
function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+   transient ExecutorService executorService;
+
+   @Override
+   public void open(Configuration parameters) throws 
Exception {
+   super.open(parameters);
+   executorService = 
Executors.newFixedThreadPool(numElements);
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   executorService.shutdown();
+   }
+
+   @Override
+   public void asyncInvoke(final Tuple2<Integer, 
NonSerializable> input,
+   final 
AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws 
Exception {
+   this.executorService.submit(new Runnable() {
+   @Override
+   public void run() {
+   // wait for while to simulate 
async operation here
+   int sleep = (int) (new 
Random().nextFloat() * 1000);
+   try {
+   Thread.sleep(sleep);
+   List ret = new 
ArrayList<>();
+   
ret.add(input.f0+input.f0);
+   collector.collect(ret);
+   }
+   catch (InterruptedException e) {
+   collector.collect(new 
ArrayList(0));
+   }
+   }
+   });
+   }
+   };
+
+   DataStream orderedResult = 
AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+   orderedResult.writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+
+   DataStream unorderedResult = 
AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1);
+   unorderedResult.writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
--- End diff --

I think it is determined by `StreamingMultipleProgramsTestBase`, which 
requires tmp files from test job's output to verify the result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86518429
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction<Integer, Integer> function;
+
+   private AsyncWaitOperator<Integer, Integer> operator;
+
+   private AsyncCollectorBuffer<Integer, Integer> buffer;
+
+   private Output<StreamRecord> output;
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction<Integer, Integer>() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector<Integer, Integer> collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   TimestampedCollector collector =new 
TimestampedCollector(output);
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.ORDERED, operator);
+   buffer.setOutput(collector, output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   Thread.sleep(1000);
--- End diff --

It is not necessary, I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86517940
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
 
synchronized (lock) {
if (isRunning) {
+   checkpointState(checkpointMetaData);
 
-   // Since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur.
-   // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
-   // can start their checkpoint work as soon as 
possible
+   // broadcast barriers after snapshot operators' 
states.
operatorChain.broadcastCheckpointBarrier(
-   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-   checkpointState(checkpointMetaData);
+   
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+   );
--- End diff --

Yes. We have discussed this problem, and the solution will make sure that 
we will not change the order of broadcasting barriers and then snapshotting.
The inelegant way is to stop all `Emitter` Thread first before broadcasting 
barrier. But it is a little tricky.
Another way is to change checkpoint lock into ReentrantReadWriteLock. For 
main thread and emitter thread, they have to acquire read lock. But for 
checkpoint thread or checkpoint procedure, write lock should be taken first, so 
that all emitter threads also stop working. In this way, main thread and all 
emitter thread will not block each other.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86517044
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Internal
+public class AsyncWaitOperator<IN, OUT>
+   extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+   implements OneInputStreamOperator<IN, OUT>
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient TypeSerializer inTypeSerializer;
+   private transient DataOutputSerializer outputSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient List inputsFromState;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer<IN, OUT> buffer;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   public void init() {
+   this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, 
this);
+   this.collector = new TimestampedCollector<>(output);
+   this.buffer.setOutput(collector, output);
+
+   this.outputSerializer = new DataOutputSerializer(128);
+   }
+
+   @Override
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output<StreamRecord> output) {
+   super.setup(containingTask, config, output);
+
+   this.inTypeSerializer = 
this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
+
+   init();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+  

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86514286
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Internal
+public class AsyncWaitOperator<IN, OUT>
+   extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+   implements OneInputStreamOperator<IN, OUT>
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient TypeSerializer inTypeSerializer;
+   private transient DataOutputSerializer outputSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient List inputsFromState;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer<IN, OUT> buffer;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   public void init() {
+   this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, 
this);
+   this.collector = new TimestampedCollector<>(output);
+   this.buffer.setOutput(collector, output);
+
+   this.outputSerializer = new DataOutputSerializer(128);
+   }
+
+   @Override
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output<StreamRecord> output) {
+   super.setup(containingTask, config, output);
+
+   this.inTypeSerializer = 
this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
+
+   init();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+  

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86514152
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Internal
+public class AsyncWaitOperator<IN, OUT>
+   extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+   implements OneInputStreamOperator<IN, OUT>
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient TypeSerializer inTypeSerializer;
+   private transient DataOutputSerializer outputSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient List inputsFromState;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer<IN, OUT> buffer;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   public void init() {
+   this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, 
this);
+   this.collector = new TimestampedCollector<>(output);
+   this.buffer.setOutput(collector, output);
+
+   this.outputSerializer = new DataOutputSerializer(128);
+   }
+
+   @Override
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output<StreamRecord> output) {
+   super.setup(containingTask, config, output);
+
+   this.inTypeSerializer = 
this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
+
+   init();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+  

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86513230
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86511814
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86511685
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r8650
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86508413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86508318
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86507829
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86507678
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private TimestampedCollector timestampedCollector;
+   private Output<StreamRecord> output;
+
+   /**
+* Locks and conditions to synchronize with main thread and emitter 
thread.
+*/
+   private final Lock lock;
+   private final Condition notFull;
+   private final Condition taskDone;
+   private final Condition isEmpty;
+
+   /**
+* Error from user codes.
+*/
+   private volatile Exception error;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private boolean isCheckpointing;
+
+   public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode 
mode, AsyncWaitOperator operator) {
+   Preconditions.checkArgument(maxSize > 0, "Future buffer size 
should be greater than 0.");
+
+   this.bufferSize = maxSize;
+   this.mode = mode;
+   this.operator = operator;
+
+   this.lock = new ReentrantLock(true);
+   this.notFull = this.lock.newCondition();
+   this.taskDone = this.lock.newConditi

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86507065
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their node references 
in the #queue.
+*/
+   private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> 
collectorToQueue = new HashMap<>();
+
+   private final LinkedList finishedCollectors = new 
LinkedList<>();
--- End diff --

I changed into ArrayDeque, which is faster than LinkedList while visiting 
all elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86506096
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
+   /**
+* A hash map keeping {@link AsyncCollector} and their corresponding 
{@link StreamElement}
+*/
+   private final Map<AsyncCollector<IN, OUT>, StreamElement> 
collectorToStreamElement = new HashMap<>();
--- End diff --

`collectorToStreamElement` is removed. Actually, `AsyncCollector` acts like 
an internal object, and should not be overwritten. So the behavior for 
`AsyncCollector` in `HashMap` and `IdentityHashMap` are the same here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86505374
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator<IN, OUT> operator;
+
+   /**
+* {@link AsyncCollector} queue.
+*/
+   private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new 
SimpleLinkedList<>();
--- End diff --

Cool! I have thought about `LinkedHashMap`, this class meets our 
requirements perfectly. The following two maps, `collectorToStreamElement` and 
the `collectorToQueue`, are also can be deleted, so does `SimpleLinkedList` :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86493542
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector<IN, OUT> {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer<IN, OUT> buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) {
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
+   }
+
+   /**
+* Set result
+* @param result A list of results.
+*/
+   public void collect(List result) {
--- End diff --

Yes, only for one time. I will update the code doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86493343
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector<IN, OUT> {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer<IN, OUT> buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) {
+   this.buffer = buffer;
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86493182
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
+   public enum OutputMode { ORDERED, UNORDERED }
+
+   private static final int DEFAULT_BUFFER_SIZE = 100;
+
+   private static <IN, OUT> SingleOutputStreamOperator 
addOperator(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize, 
OutputMode mode) {
+   TypeInformation outTypeInfo =
+   TypeExtractor.getUnaryOperatorReturnType((Function) 
func, AsyncFunction.class, false,
+   true, in.getType(), 
Utils.getCallLocationName(), true);
+
+   // create transform
+   AsyncWaitOperator<IN, OUT> operator = new 
AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func));
+   operator.setBufferSize(bufSize);
+   operator.setMode(mode);
+
+   OneInputTransformation<IN, OUT> resultTransform = new 
OneInputTransformation<>(
+   in.getTransformation(),
+   "async wait operator",
+   operator,
+   outTypeInfo,
+   in.getExecutionEnvironment().getParallelism());
+
+   SingleOutputStreamOperator returnStream =
+   new 
SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform);
+
+   
returnStream.getExecutionEnvironment().addOperator(resultTransform);
+
+   return returnStream;
+   }
+
+   /**
+* Add an AsyncWaitOperator. The order of output stream records may be 
reordered.
+*
+* @param in Input {@link DataStream}
+* @param func AsyncFunction
+* @bufSize The max number of async i/o operation that can be triggered
+* @return A new {@link SingleOutputStreamOperator}.
+*/
+   public static <IN, OUT> SingleOutputStreamOperator 
unorderedWait(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize) {
+   return addOperator(in, func, bufSize, OutputMode.UNORDERED);
+   }
+
+   public static <IN, OUT> SingleOutputStreamOperator 
unorderedWait(DataStream in,
+   

AsyncFunction<

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-04 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86493149
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
+   public enum OutputMode { ORDERED, UNORDERED }
+
+   private static final int DEFAULT_BUFFER_SIZE = 100;
+
+   private static <IN, OUT> SingleOutputStreamOperator 
addOperator(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize, 
OutputMode mode) {
+   TypeInformation outTypeInfo =
+   TypeExtractor.getUnaryOperatorReturnType((Function) 
func, AsyncFunction.class, false,
+   true, in.getType(), 
Utils.getCallLocationName(), true);
+
+   // create transform
+   AsyncWaitOperator<IN, OUT> operator = new 
AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func));
+   operator.setBufferSize(bufSize);
+   operator.setMode(mode);
+
+   OneInputTransformation<IN, OUT> resultTransform = new 
OneInputTransformation<>(
+   in.getTransformation(),
+   "async wait operator",
+   operator,
+   outTypeInfo,
+   in.getExecutionEnvironment().getParallelism());
+
+   SingleOutputStreamOperator returnStream =
+   new 
SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform);
+
+   
returnStream.getExecutionEnvironment().addOperator(resultTransform);
+
+   return returnStream;
+   }
+
+   /**
+* Add an AsyncWaitOperator. The order of output stream records may be 
reordered.
+*
+* @param in Input {@link DataStream}
+* @param func AsyncFunction
+* @bufSize The max number of async i/o operation that can be triggered
+* @return A new {@link SingleOutputStreamOperator}.
+*/
+   public static <IN, OUT> SingleOutputStreamOperator 
unorderedWait(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize) {
+   return addOperator(in, func, bufSize, OutputMode.UNORDERED);
+   }
+
+   public static <IN, OUT> SingleOutputStreamOperator 
unorderedWait(DataStream in,
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86492545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
+   public enum OutputMode { ORDERED, UNORDERED }
+
+   private static final int DEFAULT_BUFFER_SIZE = 100;
+
+   private static <IN, OUT> SingleOutputStreamOperator 
addOperator(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize, 
OutputMode mode) {
+   TypeInformation outTypeInfo =
+   TypeExtractor.getUnaryOperatorReturnType((Function) 
func, AsyncFunction.class, false,
+   true, in.getType(), 
Utils.getCallLocationName(), true);
+
+   // create transform
+   AsyncWaitOperator<IN, OUT> operator = new 
AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func));
+   operator.setBufferSize(bufSize);
+   operator.setMode(mode);
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86492478
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
+   public enum OutputMode { ORDERED, UNORDERED }
+
+   private static final int DEFAULT_BUFFER_SIZE = 100;
+
+   private static <IN, OUT> SingleOutputStreamOperator 
addOperator(DataStream in,
+   

AsyncFunction<IN, OUT> func,
+   
int bufSize, 
OutputMode mode) {
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86492216
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86486644
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86486482
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86486085
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
--- End diff --

Emm, I will change it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86486055
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86484584
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+   String 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r86482845
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
--- End diff --

It is optional here ;D. I will remove it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-11-03 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Hi @tillrohrmann , Thanks for your review ;D I will check through each of 
your comments and update the PR later.
Coming to the first part of review, the first one is about `UNORDERED` mode 
against `Watermark`. This combination is meaningless, of course. Maybe an error 
can be printed out and the graph generator stops compiling the graph if  
`UNORDERED` mode and `Watermark` are enabled at the same time?

Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing 
checkpoint for the chained operator and making the snapshot for the 
`AsyncWaitOperator`, it will first try to  get all elements in the  
`AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try 
to get the lock first to block `Emitter` thread and set a flag named 
`isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector` 
will not be transferred to the next operator. Calling the `snapshotState()` 
method is from the head operator to the tail operator, making sure that all 
states can be taken correctly since `Emitter` threads in parent operators have 
stopped working.

I used to consider about using checkpoint lock in `Emitter` thread, but 
after testing with the case chaining multiple `AsyncWaitOperator` together, all 
`Emitter` thread can not fully utilize the the parallelism since they have to 
get the same lock while collecting outputs. One way to optimize this is to put 
a conditional statement at `performCheckpoint()`, if there is an 
`AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers 
later after `checkpointState()`, otherwise, we can use original design.

At last, I will add more test cases based on the 
`OneInputStreamTaskTestHarness`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2629: [FLINK-4391] Provide support for asynchronous operations ...

2016-10-17 Thread bjlovegithub
Github user bjlovegithub commented on the issue:

https://github.com/apache/flink/pull/2629
  
Update the PR based on the latest review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-10-16 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r83573944
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector<IN, OUT> {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private AsyncCollectorBuffer<IN, OUT> buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer) {
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer<IN, OUT> buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
+   }
+
+   /**
+* Set result
+* @param result A list of results.
+*/
+   public void collect(List result) {
+   this.result = result;
+   isDone = true;
+   buffer.mark(this);
+   }
+
+   /**
+* Set error
+* @param error A Throwable object.
+*/
+   public void collect(Throwable error) {
+   this.error = error;
+   isDone = true;
+   buffer.mark(this);
+   }
+
+   /**
+* Get result. Throw RuntimeException while encountering an error.
+*
+* @return A List of result.
+* @throws RuntimeException RuntimeException wrapping errors from user 
codes.
+*/
+   public List getResult() throws RuntimeException {
--- End diff --

That makes me clarify about how to use RuntimeException ;D
I prefer using IOException, since the error is from I/O process. I will 
change RuntimeException into IOException in the codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-10-16 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r83573419
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   Thread.sleep(10);
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+   String timeType = args[7];
+
+   // 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-10-16 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r83571326
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   Thread.sleep(10);
--- End diff --

That is right. It will block checkpoint for extra 0.1 second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-10-13 Thread bjlovegithub
GitHub user bjlovegithub opened a pull request:

https://github.com/apache/flink/pull/2629

[FLINK-4391] Provide support for asynchronous operations over streams

PR for [FLIP 
12](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673) 
- Implementation for Async I/O in FLINK.

Basically, the newly added operator supports async operation while 
streaming. By implementing AsyncFunction, we can easily do i/o access, like 
fetching data from HBase, in async way.

Please refer to AsyncIOExample.java about how to use it.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bjlovegithub/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2629.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2629


commit f352d0a8b879164baa8e4922f63ad8dde13c7c9b
Author: yushi.wxg <yushi@taobao.com>
Date:   2016-10-12T06:31:43Z

Add async wait operator

commit 139902396e1c30d88671ac67a54afea145ab7651
Author: yushi.wxg <yushi@taobao.com>
Date:   2016-10-13T10:13:32Z

1. add an example job 2. fix a bug in state serialization in async wait 
operator; 3. move broadcast barrier after snapshot operator states

commit fc41eedcd48d2dd7d940e744dec86a51b24e4ac5
Author: yushi.wxg <yushi@taobao.com>
Date:   2016-10-13T10:25:36Z

update IT case

commit 9855f071f04b6efcfc4e10a9f6ef1c38ae264637
Author: yushi.wxg <yushi@taobao.com>
Date:   2016-10-13T10:28:39Z

adjust the whitespace in IT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---