[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3838


---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-16 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133392462
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

My bad - I missed the java unchecked exceptions part (.e.g runtime 
exception). It'll be much better to use it here. 
As for the `read`, we can either return `null` or again use the runtime 
exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133192783
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that 
the record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector {
+   private Collector collector;
+
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   @Override
+   public void collect(PyObject record) {
+   PyObject po = UtilityFunctions.adapt(record);
--- End diff --

Actually you're right. This line can be dropped. And now, it seems that the 
`PythonCollector` is redundant, though it provides a more safety layer to 
report users about casting problems in case they provide UDF from the Java 
source code. If their function would not handle PyObject that it will break in 
runtime, saying something like and object cannot be cast to 'PyObject'. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133171218
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

ObjectOutputStreams do not fail with an IOException if the object is null,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133170150
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169625
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
+   int len = input.readInt();
+   byte[] serPo = new byte[len];
+   input.read(serPo);
+   PyObject po = null;
+   try {
+   po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes. Same as the answer above. In order to verify it, during debugging, I 
set the returned value to null and it continued without issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169331
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes, indeed. According to 
https://github.com/EsotericSoftware/kryo#serializers: "By default, serializers 
do not need to handle the object being null."
But anyway, I added a print message to the log. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133159049
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

I would throw a RuntimeException to fail the job. If the function can't be 
deserialized something went terribly wrong during deployment.

Returning null would cause an NPE later on in `DirectedOutput`, obfuscating 
what actually caused the error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133156452
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

The thing is that the signature of the `select` function in 
`OutputSelector` interface is without any exception throw declaration. This is 
why I catch the given exceptions here and add a check in the following lines to 
test whether the variable `this.fun` is null. If it is `null`, then the 
function returns null. How can I do it otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133154903
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

Instead I've just provided a simplified version of it. Will send for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133132675
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-14 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132967578
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+   public PythonObjectInputStream2(InputStream in) throws IOException {
+   super(in);
+   }
+
+   protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+   ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+   Class localClass;
+   try {
+   localClass = resolveClass(resultClassDescriptor);
+   } catch (ClassNotFoundException e) {
+   System.out.println("No local class for " + 
resultClassDescriptor.getName());
--- End diff --

The purpose of this function is to try to return the class according to the 
given description. If it fails, it probably means that the Jython interpreter 
was not initialised yet, and as a result it is initialised. This is handled in 
`org.apache.flink.streaming.python.api.functions.UtilityFunctions::smartFunctionDeserialization`.

I'm currently checking whether the `catch` here is redundant and it's a 
left-over from the debugging phase. We can probably let the exception be 
propagated up the call stack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132843982
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132842489
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

I agree that the function names here are a bit confusing - in essence this 
function locates a single test file, while the function in the next code line 
`getFilesInFolder` collects files that start with `test_`, thus the main test 
file `run_all_tests.py` will be filtered and not be included. 
So, in order to be more readable and robust, I changed the 
`getFilesInFolder` to receive one more argument of `excludes` and call it with 
the main test file in the `excludes` argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132841381
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

Done for the Scala version. 

As for not being able to run the tests:
1. I fixed the issue with the ```TypeError: object of type 
'java.lang.Class' has no len()```.
2. I still can't reproduce the main issue, concerning an import of java 
class from the Python module:
File: 
```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py```
Line:19: ```from org.apache.flink.api.java.utils import ParameterTool```

The given class (ParameterTool) resides in different project `flink-java` 
and the jython module cannot find it. Probably, It somehow concerns the 
CLASSPATH. 

Any suggestion for how to reproduce it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121893907
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889539
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
+   List filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
+
+   String[] args = new String[filesInTestPath.size() + 1];
+   args[0] = testFullPath.getAbsolutePath();
+
+   for (final ListIterator it = 
filesInTestPath.listIterator(); it.hasNext();) {
+   final String p = it.next();
+   args[it.previousIndex() + 1] = p;
+   }
+   return args;
+   }
+
+   private static File findStreamTestFile(String name) throws Exception {
+   if (new File(name).exists()) {
+   return new File(name);
+   }
+   FileSystem fs = FileSystem.getLocalFileSystem();
+   String workingDir = fs.getWorkingDirectory().getPath();
+   if (!workingDir.endsWith(flinkPythonRltvPath)) {
+   workingDir += File.separator + flinkPythonRltvPath;
+   }
+   FileStatus[] status = fs.listStatus(
+   new Path( workingDir + File.separator + 
pathToStreamingTests));
+   for (FileStatus f : status) {
+   String file_name = f.getPath().getName();
--- End diff --

should be named `fileName`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121891478
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
--- End diff --

the method parameter is named `config`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889042
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
--- End diff --

by convention static final fields are upper case, as in 
`DEFAULT_PYTHON_SCRIPT_NAME`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900689
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
--- End diff --

remove space after `read`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888394
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
--- End diff --

this import can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900800
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
+   int len = input.readInt();
+   byte[] serPo = new byte[len];
+   input.read(serPo);
+   PyObject po = null;
+   try {
+   po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

is it intended to silently continue here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900659
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
--- End diff --

remove space after `write`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121890065
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

Since we include this in the list of scripts to run, does that mean that we 
run all tests twice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888620
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
--- End diff --

you should call `main` directly without referencing `this`, otherwise one 
would assume that is a non-static method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121891397
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
--- End diff --

this should refer to `set_parallelism`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900122
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

we should fail here, and in case of a `ClassNotFoundException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900972
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+/**
+ * A Python serialization schema, which implements {@link 
SerializationSchema}. It converts
+ * a {@code PyObject} into its serialized form.
+ */
+public class PythonSerializationSchema implements 
SerializationSchema {
+   private static final long serialVersionUID = -9170596504893036458L;
+
+   private final byte[] serSchema;
+   private transient SerializationSchema schema;
+
+   public PythonSerializationSchema(SerializationSchema schema) 
throws IOException {
+   this.serSchema = SerializationUtils.serializeObject(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(PyObject element) {
+   if (this.schema == null) {
+   try {
+   this.schema = 
(SerializationSchema)SerializationUtils.deserializeObject(this.serSchema);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

we should fail right here, as otherwise we get an NPE later on anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900761
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

is it intended to silently continue here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121898935
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonReduceFunction;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+
+/**
+ * A thin wrapper layer over {@link KeyedStream}.
+ *
+ * A {@code PythonKeyedStream} represents a {@link PythonDataStream} on 
which operator state is
+ * partitioned by key using a provided {@link 
org.apache.flink.api.java.functions.KeySelector;}
+ */
+@Public
+public class PythonKeyedStream extends 
PythonDataStream> {
+
+   public PythonKeyedStream(KeyedStream stream) {
--- End diff --

we can remove the public modifier; this applied to other variations of 
`PythonDataStream` as well, exception `PythonDataStream` itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121901070
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A Python deserialization schema, which implements {@link 
DeserializationSchema}. It converts a
+ * serialized form of {@code PyObject} into its Java Object representation.
+ */
+public class PythonDeserializationSchema implements 
DeserializationSchema {
+   private static final long serialVersionUID = -9180596504893036458L;
+   private final TypeInformation resultType = 
TypeInformation.of(new TypeHint(){});
+
+   private final byte[] serSchema;
+   private transient DeserializationSchema schema;
+
+   public PythonDeserializationSchema(DeserializationSchema 
schema) throws IOException {
+   this.serSchema = SerializationUtils.serializeObject(schema);
+   }
+
+   @SuppressWarnings("unchecked")
+   public Object deserialize(byte[] message) throws IOException {
+   if (this.schema == null) {
+   try {
+   this.schema = (DeserializationSchema) 
SerializationUtils.deserializeObject(this.serSchema);
+   } catch (ClassNotFoundException e) {
+   e.printStackTrace();
--- End diff --

we should fail right here as otherwise he get an NPE later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889286
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
+   List filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
+
+   String[] args = new String[filesInTestPath.size() + 1];
+   args[0] = testFullPath.getAbsolutePath();
+
+   for (final ListIterator it = 
filesInTestPath.listIterator(); it.hasNext();) {
+   final String p = it.next();
+   args[it.previousIndex() + 1] = p;
+   }
+   return args;
+   }
+
+   private static File findStreamTestFile(String name) throws Exception {
+   if (new File(name).exists()) {
+   return new File(name);
+   }
+   FileSystem fs = FileSystem.getLocalFileSystem();
+   String workingDir = fs.getWorkingDirectory().getPath();
+   if (!workingDir.endsWith(flinkPythonRltvPath)) {
+   workingDir += File.separator + flinkPythonRltvPath;
+   }
+   FileStatus[] status = fs.listStatus(
+   new Path( workingDir + File.separator + 
pathToStreamingTests));
+   for (FileStatus f : status) {
+   String file_name = f.getPath().getName();
+   if (file_name.equals(name)) {
+   return new File(f.getPath().getPath());
+   }
+   }
+   throw new FileNotFoundException();
+   }
+
+   private static List getFilesInFolder(String path) {
--- End diff --

I would rename this to `getTestFilesInFolder` and change the signature to 
accept a `File´ and return a `List`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121887994
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

replace `2.10` with `${scala.binary.version`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121894570
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888454
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
--- End diff --

These modifiers should be re-ordered to `private static final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121899652
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+   public PythonObjectInputStream2(InputStream in) throws IOException {
+   super(in);
+   }
+
+   protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+   ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+   Class localClass;
+   try {
+   localClass = resolveClass(resultClassDescriptor);
+   } catch (ClassNotFoundException e) {
+   System.out.println("No local class for " + 
resultClassDescriptor.getName());
--- End diff --

Remove `println`. If an exception is thrown, what exactly does that mean to 
the job execution? I.e., why don't we fail here completely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888058
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
+   1.2.0
--- End diff --

replace version with `${project.version}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121901283
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that 
the record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector {
+   private Collector collector;
+
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   @Override
+   public void collect(PyObject record) {
+   PyObject po = UtilityFunctions.adapt(record);
--- End diff --

AM i missing something or is this call unnecessary. Isn't it guaranteed 
that `record instanceof PyObject` is `true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117917812
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

Otherwise we include them in the jar-with-dependencies and have them 
multiple times on the classpath.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117917576
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

argh, let me try again: They should be marked as `provided`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117864592
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

I'm not sure what you mean. Please explain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117807878
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) {
//  Utilities
// 

 
+   /**
+* Remove a given path recursively if exists. The path can be a 
filepath or directory.
+*
+* @param path  The root path to remove.
+* @throws IOException
+* @throws URISyntaxException
+*/
+   public static void clearPath(String path) throws IOException, 
URISyntaxException {
--- End diff --

My bad! I don't use it anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117806756
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStreamExecutionEnvironment 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117802052
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

Ah didn't see that; figured it was a leftover from the development.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117793843
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStreamExecutionEnvironment 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117788892
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

How would you suggest to call it? It extends the `PythonObjectInputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117722927
  
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+
+
+Analysis streaming programs in Flink are regular programs that implement 
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or 
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming 
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to 
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as 
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---
+Flink Python streaming API uses Jython framework (see 
)
+to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+ Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-
+The following streaming program is a complete, working example of 
WordCount. You can copy  paste the code
+to run it locally (see notes later in this section). It counts the number 
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints 
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import 
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+def __init__(self, num_iters):
+self._running = True
+self._num_iters = num_iters
+
+def run(self, ctx):
+counter = 0
+while self._running and counter < self._num_iters:
+ctx.collect('Hello World')
+counter += 1
+
+def cancel(self):
+self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+for word in value.lower().split():
+collector.collect((1, word))
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, word1 = input1
+count2, word2 = input2
+return (count1 + count2, word1)
+
+def main():
+env = PythonStreamExecutionEnvironment.get_execution_environment()
+env.create_python_source(Generator(num_iters=1000)) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(50)) \
+.reduce(Sum()) \
+.print()
+env.execute()
+
+
+if __name__ == '__main__':
+main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line 
in the `main()` function
+  with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage, 
which needs to be configured (.e.g HDFS)
+  upfront.
+- The output from of the given script is directed to the standard output. 
Consequently, the output
+  is written to the corresponding worker `.out` filed. If the script is 
executed inside the IntelliJ IDE,
+  then the output will be displayed in the console tab.
+
+{% top %}
+
+Program Skeleton
+
+As we already saw in the example, Flink streaming programs look like 
regular Python programs.
+Each program consists of the same basic parts:
+
+1. A `main()` function definition, without 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724977
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStreamExecutionEnvironment 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117725288
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStreamExecutionEnvironment 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724604
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

remove ´2` from name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117725620
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStreamExecutionEnvironment 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117726359
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) {
//  Utilities
// 

 
+   /**
+* Remove a given path recursively if exists. The path can be a 
filepath or directory.
+*
+* @param path  The root path to remove.
+* @throws IOException
+* @throws URISyntaxException
+*/
+   public static void clearPath(String path) throws IOException, 
URISyntaxException {
--- End diff --

I thought we decided against adding this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723487
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

all flink dependencies should be marked as `´.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724247
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
+public class PythonDataStream> {
+   protected final D stream;
+
+   public PythonDataStream(D stream) {
+   this.stream = stream;
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#union(DataStream[])}.
+*
+* @param streams
+*The Python DataStreams to union output with.
+* @return The {@link PythonDataStream}.
+*/
+   @SafeVarargs
+   @SuppressWarnings("unchecked")
+   public final PythonDataStream union(PythonDataStream... streams) {
+   ArrayList dsList = new ArrayList<>();
+   for (PythonDataStream ps : streams) {
+   dsList.add(ps.stream);
+   }
+   DataStream[] dsArray = new DataStream[dsList.size()];
+   return new 
PythonDataStream(stream.union(dsList.toArray(dsArray)));
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#split(OutputSelector)}.
+*
+* @param output_selector
+*The user defined {@link OutputSelector} for directing the 
tuples.
+* @return The {@link PythonSplitStream}
+*/
+   public PythonSplitStream split(OutputSelector 
output_selector) throws IOException {
+   return new PythonSplitStream(this.stream.split(new 
PythonOutputSelector(output_selector)));
+   }
+
   

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723303
  
--- Diff: 
flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 ---
@@ -16,12 +16,12 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 

-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
+from flink.functions.Aggregation import Max, Min, Sum
+from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
-from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.functions.Aggregation import Max, Min, Sum
+from flink.functions.MapFunction import MapFunction
+from flink.plan.Environment import get_environment
--- End diff --

should be reverted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724857
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
--- End diff --

mark as `@PublicEvolving`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723223
  
--- Diff: flink-libraries/flink-python/pom.xml ---
@@ -59,20 +59,32 @@ under the License.
 org.apache.flink
 flink-core
 ${project.version}
-   provided
 
 
 org.apache.flink
 flink-java
 ${project.version}
-   provided
 
+   
+   org.apache.flink
--- End diff --

These changes should be reverted i guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724828
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
--- End diff --

static fields should be above non-static fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117722682
  
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+
+
+Analysis streaming programs in Flink are regular programs that implement 
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or 
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming 
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to 
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as 
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---
+Flink Python streaming API uses Jython framework (see 
)
+to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+ Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-
+The following streaming program is a complete, working example of 
WordCount. You can copy  paste the code
+to run it locally (see notes later in this section). It counts the number 
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints 
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import 
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+def __init__(self, num_iters):
+self._running = True
+self._num_iters = num_iters
+
+def run(self, ctx):
+counter = 0
+while self._running and counter < self._num_iters:
+ctx.collect('Hello World')
+counter += 1
+
+def cancel(self):
+self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+for word in value.lower().split():
+collector.collect((1, word))
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, word1 = input1
+count2, word2 = input2
+return (count1 + count2, word1)
+
+def main():
+env = PythonStreamExecutionEnvironment.get_execution_environment()
+env.create_python_source(Generator(num_iters=1000)) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(50)) \
+.reduce(Sum()) \
+.print()
+env.execute()
+
+
+if __name__ == '__main__':
+main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line 
in the `main()` function
+  with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage, 
which needs to be configured (.e.g HDFS)
+  upfront.
+- The output from of the given script is directed to the standard output. 
Consequently, the output
+  is written to the corresponding worker `.out` filed. If the script is 
executed inside the IntelliJ IDE,
--- End diff --

type: filed -> file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117726281
  
--- Diff: flink-libraries/pom.xml ---
@@ -52,7 +53,7 @@ under the License.

org.slf4j
slf4j-api
-   provided
--- End diff --

This should be reverted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724239
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
+public class PythonDataStream> {
+   protected final D stream;
+
+   public PythonDataStream(D stream) {
+   this.stream = stream;
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#union(DataStream[])}.
+*
+* @param streams
+*The Python DataStreams to union output with.
+* @return The {@link PythonDataStream}.
+*/
+   @SafeVarargs
+   @SuppressWarnings("unchecked")
+   public final PythonDataStream union(PythonDataStream... streams) {
+   ArrayList dsList = new ArrayList<>();
+   for (PythonDataStream ps : streams) {
+   dsList.add(ps.stream);
+   }
+   DataStream[] dsArray = new DataStream[dsList.size()];
+   return new 
PythonDataStream(stream.union(dsList.toArray(dsArray)));
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#split(OutputSelector)}.
+*
+* @param output_selector
+*The user defined {@link OutputSelector} for directing the 
tuples.
+* @return The {@link PythonSplitStream}
+*/
+   public PythonSplitStream split(OutputSelector 
output_selector) throws IOException {
+   return new PythonSplitStream(this.stream.split(new 
PythonOutputSelector(output_selector)));
+   }
+
   

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724303
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
--- End diff --

I would make the entire class as `@PublicEvolving`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723282
  
--- Diff: 
flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 ---
@@ -15,18 +15,18 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 

-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
-from flink.functions.FlatMapFunction import FlatMapFunction
+import struct
 from flink.functions.FilterFunction import FilterFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.MapFunction import MapFunction
--- End diff --

should be reverted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-07 Thread zohar-mizrahi
GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3838

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3838


commit c1333c3424897caa615683d3494b41e7ab88d45d
Author: Zohar Mizrahi 
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---