[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

2018-01-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/5333
  
Just started to look at your changes and have one comment with respect to 
the plan.py - have you tried executing the same script twice, but on the second 
time change one line in the script (.e.g a map function)? Make sure the change 
takes place for the second run.
(BTW - I'm not sure I'll be able to spend much time in the near future in 
reviewing the whole changes)


---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-24 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Referring to the issue with the ```PythonEnvironmentConfig ``` above, Is 
there any other global indication that I can use to test whether a given 
function is executed on the TaskManager?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
One of the critical attributes is 
```PythonEnvironmentConfig::pythonTmpCachePath```, which is used in the 
following places: 
- ```PythonStreamExecutionEnvironment::execute:362```
- ```PythonStreamExecutionEnvironment::execute:400```
- ```PythonStreamBinder::prepareFiles:117```

On the client side, the temporary files are prepared for distribution by 
the ```PythonStreamBinder``` and then processed by the 
``PythonStreamExecutionEnvironment::execute``` function, which is called 
from the Python script. When the python script is executed on the TaskManager, 
this attribute remains ```null``` and thus, the ```execute``` returns 
immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
The thing is that I use the 
```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver 
information from the ```PythonStreamBinder``` to a class that is called from 
the python script. 
How would you suggest to do it otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm trying to track down the root cause for the checks failures without a 
success. Obviously, the given project (flink-libraries/flink-streaming-python) 
in master branch passes the `verify` with success in my environment.

Please advise,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-16 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Regarding the exception - 
```java.io.IOException: java.io.IOException: The given HDFS file URI ...```

In general, using the python interface requires a valid configuration of 
shared file system (.e.g HDFS), which designed to distribute the python files. 
Someone can bypass this issue by set the second argument to 'True' when 
calling to ```env.execute(...)``` in the python script.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-16 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133392462
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

My bad - I missed the java unchecked exceptions part (.e.g runtime 
exception). It'll be much better to use it here. 
As for the `read`, we can either return `null` or again use the runtime 
exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133192783
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that 
the record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector {
+   private Collector collector;
+
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   @Override
+   public void collect(PyObject record) {
+   PyObject po = UtilityFunctions.adapt(record);
--- End diff --

Actually you're right. This line can be dropped. And now, it seems that the 
`PythonCollector` is redundant, though it provides a more safety layer to 
report users about casting problems in case they provide UDF from the Java 
source code. If their function would not handle PyObject that it will break in 
runtime, saying something like and object cannot be cast to 'PyObject'. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133170150
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169625
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
+   int len = input.readInt();
+   byte[] serPo = new byte[len];
+   input.read(serPo);
+   PyObject po = null;
+   try {
+   po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes. Same as the answer above. In order to verify it, during debugging, I 
set the returned value to null and it continued without issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169331
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes, indeed. According to 
https://github.com/EsotericSoftware/kryo#serializers: "By default, serializers 
do not need to handle the object being null."
But anyway, I added a print message to the log. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133156452
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

The thing is that the signature of the `select` function in 
`OutputSelector` interface is without any exception throw declaration. This is 
why I catch the given exceptions here and add a check in the following lines to 
test whether the variable `this.fun` is null. If it is `null`, then the 
function returns null. How can I do it otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-15 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133154903
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

Instead I've just provided a simplified version of it. Will send for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-14 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132967578
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+   public PythonObjectInputStream2(InputStream in) throws IOException {
+   super(in);
+   }
+
+   protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+   ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+   Class localClass;
+   try {
+   localClass = resolveClass(resultClassDescriptor);
+   } catch (ClassNotFoundException e) {
+   System.out.println("No local class for " + 
resultClassDescriptor.getName());
--- End diff --

The purpose of this function is to try to return the class according to the 
given description. If it fails, it probably means that the Jython interpreter 
was not initialised yet, and as a result it is initialised. This is handled in 
`org.apache.flink.streaming.python.api.functions.UtilityFunctions::smartFunctionDeserialization`.

I'm currently checking whether the `catch` here is redundant and it's a 
left-over from the debugging phase. We can probably let the exception be 
propagated up the call stack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132843982
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+ 

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132842489
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

I agree that the function names here are a bit confusing - in essence this 
function locates a single test file, while the function in the next code line 
`getFilesInFolder` collects files that start with `test_`, thus the main test 
file `run_all_tests.py` will be filtered and not be included. 
So, in order to be more readable and robust, I changed the 
`getFilesInFolder` to receive one more argument of `excludes` and call it with 
the main test file in the `excludes` argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132841381
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

Done for the Scala version. 

As for not being able to run the tests:
1. I fixed the issue with the ```TypeError: object of type 
'java.lang.Class' has no len()```.
2. I still can't reproduce the main issue, concerning an import of java 
class from the Python module:
File: 
```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py```
Line:19: ```from org.apache.flink.api.java.utils import ParameterTool```

The given class (ParameterTool) resides in different project `flink-java` 
and the jython module cannot find it. Probably, It somehow concerns the 
CLASSPATH. 

Any suggestion for how to reproduce it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
In the last change, I've rebased locally on top of origin/master, so I did 
`git push -f` to the master branch in my fork.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117864592
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

I'm not sure what you mean. Please explain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117807878
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) {
//  Utilities
// 

 
+   /**
+* Remove a given path recursively if exists. The path can be a 
filepath or directory.
+*
+* @param path  The root path to remove.
+* @throws IOException
+* @throws URISyntaxException
+*/
+   public static void clearPath(String path) throws IOException, 
URISyntaxException {
--- End diff --

My bad! I don't use it anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117806756
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStream

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117793843
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public static PythonStream

[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117788892
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

How would you suggest to call it? It extends the `PythonObjectInputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3826: [FLINK-5886] Python API for streaming applications

2017-05-08 Thread zohar-mizrahi
Github user zohar-mizrahi closed the pull request at:

https://github.com/apache/flink/pull/3826


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-07 Thread zohar-mizrahi
GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3838

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3838


commit c1333c3424897caa615683d3494b41e7ab88d45d
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3827: [FLINK-5886] Python API for streaming applications

2017-05-04 Thread zohar-mizrahi
Github user zohar-mizrahi closed the pull request at:

https://github.com/apache/flink/pull/3827


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996944#comment-15996944
 ] 

Zohar Mizrahi commented on FLINK-5886:
--

No problem - I'll rebase on top of master.

As for #3826 - maybe you referred to another ticket, because I'm not familiar 
with this one.

> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>    Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3827: [FLINK-5886] Python API for streaming applications

2017-05-04 Thread zohar-mizrahi
GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3827

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink python-streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3827.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3827


commit 03684e8e460143013babb2ec88c66c8fa1119c43
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2017-04-09T09:11:57Z

[FLINK-6177] Add support for "Distributed Cache" in streaming applications

commit 7e6374f13a1846b6982923083ee98140c37d5903
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2017-04-20T08:19:38Z

[FLINK-6177] Apply suggested fixes from a review

commit 8606fafff92bbcea48c64a6accf87ec2b6802b46
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2017-04-20T16:38:49Z

[FLINK-6177] Combine the streaming & batch distributed cache tests and 
inherit from StreamingMultipleProgramsTestBase

commit fa74b64d100e51f4a8776b613714581c585b1ddd
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2017-05-01T13:07:14Z

[FLINK-6177] Add missing file cache functions to the scala 
StreamExecutionEnvironment

commit cbbd86b59a4a5561fd383a704bc95c5c1c255449
Author: Zohar Mizrahi <zohar.mizr...@parallelmachines.com>
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3826: [FLINK-5886] Python API for streaming applications

2017-05-04 Thread zohar-mizrahi
GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3826

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink python-streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3826.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3826


commit 7d1b9f0bb2dea719dc438cb8649f200cc6235980
Author: Robert Metzger <rmetz...@apache.org>
Date:   2017-01-03T09:05:54Z

[FLINK-4861][hotfix] Fix change-scala-version script for opt assembly

commit a6a5b21ef8c7fdd7d073296208f47d47ca6a
Author: Sachin <sachingoel0...@gmail.com>
Date:   2017-01-03T10:28:11Z

[FLINK-5382][web-frontend] Fix problems with downloading TM logs on Yarn

commit 335175e6eefc260cf1600544639594d85836f7d8
Author: Ivan Mushketyk <ivan.mushke...@gmail.com>
Date:   2016-12-16T07:56:46Z

[FLINK-5349] [docs] Fix typos in Twitter connector example
This closes #3015.

commit bb46fffe310f9cd6f667293df14d98e90011d591
Author: Abhishek R. Singh <abhis...@tetrationanalytics.com>
Date:   2016-12-14T14:05:11Z

[FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

THis closes #3006.

commit 24109cb2692f1f0dd2b9f8c9c8dcc02e55148bab
Author: zentol <ches...@apache.org>
Date:   2016-11-25T12:27:43Z

[FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

This closes #2887.

commit b50bbcc8853c1c2ebcdba9c74a70bfdfbe6557ab
Author: Boris Osipov <boris_osi...@epam.com>
Date:   2016-12-16T07:30:33Z

[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

This closes #3019.

commit 9c0c19aae5b78de71c91a735a76cd9196dc8482c
Author: zentol <ches...@apache.org>
Date:   2016-11-25T11:51:38Z

[FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on 
Windows

This closes #2888.

commit 91f9a1acaa899159a0d907528634bd246e6854b4
Author: Stephan Ewen <se...@apache.org>
Date:   2017-01-04T23:18:13Z

[FLINK-5408] [RocksDB backend] Uniquify RocksDB JNI library path to avoid 
multiple classloader problem

commit fb48c3b4cbc5a186cb7b812c8d05833c5852b385
Author: Stephan Ewen <se...@apache.org>
Date:   2017-01-05T13:44:00Z

[FLINK-4890] [core] Make GlobFilePathFilter work on Windows

commit 3554c96d118a411906a22b1f1087de073617a4c7
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-12-28T11:50:00Z

[FLINK-5397] [runtime] Do not replace ObjectStreamClass on deserialization 
of migration package classes, override resolveClass(...) instead

This closes #3050

commit 700cbd464345e9c180cfef58a4082b2e39d27160
Author: Stephan Ewen <se...@apache.org>
Date:   2017-01-05T16:30:37Z

[hotfix] Set default test logger back to 'OFF' in 'flink-tests'

commit 65a32e74175c026f04ab058ee2ace8c9e7012d76
Author: zentol <ches...@apache.org>
Date:   2017-01-05T14:39:25Z

[FLINK-5160] SecurityUtils use OperatingSystem.getCurrentOperatingSystem()

commit aaf9612791284633727a0951d0d45292ef5e233c
Author: zentol <ches...@apache.org>
Date:   2017-01-05T15:18:38Z

[FLINK-5412] Enable RocksDB tests on Windows OS

commit 9d99f2bd4a29b748905e55a774ff04f933b6b00f
Author: Alexey Diomin <diomi...@gmail.com>
Date:   2016-07-04T15:13:11Z

[FLINK-4148] Fix min distance calculation in QuadTree

commit b93f80afc7c50c7fefc850b620bb571523343595
Author: Sachin Goel <sachingoel0...@gmail.com>
Date:   2017-01-05T18:00:26Z

[FLINK-5119][web-frontend] Fix problems in displaying TM heartbeat and path.

commit 7251a4ca9539200ba4c894c1

[jira] [Closed] (FLINK-6283) Enable to clear a given file cache path

2017-04-18 Thread Zohar Mizrahi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zohar Mizrahi closed FLINK-6283.

Resolution: Won't Do

> Enable to clear a given file cache path 
> 
>
> Key: FLINK-6283
> URL: https://issues.apache.org/jira/browse/FLINK-6283
> Project: Flink
>  Issue Type: Improvement
>        Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>Priority: Minor
>
> In the context of the FileCache functionality that is used within the 
> distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path

2017-04-18 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972776#comment-15972776
 ] 

Zohar Mizrahi commented on FLINK-6283:
--

Yes, this is how I implemented it. In addition I added a conditional check 
whether the path exists (as far as I remember to avoid an exception)
But as you mentioned, nothing fancy here. I'll skip this change.

> Enable to clear a given file cache path 
> 
>
> Key: FLINK-6283
> URL: https://issues.apache.org/jira/browse/FLINK-6283
> Project: Flink
>  Issue Type: Improvement
>        Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>Priority: Minor
>
> In the context of the FileCache functionality that is used within the 
> distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path

2017-04-18 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972673#comment-15972673
 ] 

Zohar Mizrahi commented on FLINK-6283:
--

Yep, FileCache#copy and therefore, I also added the FileCache#clearPath. 

Assuming the random number generator is strong enough, a given temporary folder 
would not repeat itself (so, I may skip this change). Actually, I have to make 
sure to cleanup the given temporary folder once the execution is completed. We 
don't really want to consume the storage/memory (depends how the /tmp folder is 
mounted) over time.

> Enable to clear a given file cache path 
> 
>
> Key: FLINK-6283
> URL: https://issues.apache.org/jira/browse/FLINK-6283
> Project: Flink
>  Issue Type: Improvement
>        Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>Priority: Minor
>
> In the context of the FileCache functionality that is used within the 
> distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path

2017-04-18 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972635#comment-15972635
 ] 

Zohar Mizrahi commented on FLINK-6283:
--

In one case - when the python files are prepared for client side execution. A 
temporary folder is created (though, concatenated with random number) locally, 
without the job scope. All the python files (along with packages), are then 
copied to that temporary folder, for local execution.

> Enable to clear a given file cache path 
> 
>
> Key: FLINK-6283
> URL: https://issues.apache.org/jira/browse/FLINK-6283
> Project: Flink
>  Issue Type: Improvement
>        Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>Priority: Minor
>
> In the context of the FileCache functionality that is used within the 
> distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path

2017-04-18 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972346#comment-15972346
 ] 

Zohar Mizrahi commented on FLINK-6283:
--

It refers to a file cache created by a python streaming application. For that 
purpose, it uses {{org.apache.flink.runtime.filecache.FileCache}}.

> Enable to clear a given file cache path 
> 
>
> Key: FLINK-6283
> URL: https://issues.apache.org/jira/browse/FLINK-6283
> Project: Flink
>  Issue Type: Improvement
>        Reporter: Zohar Mizrahi
>    Assignee: Zohar Mizrahi
>Priority: Minor
>
> In the context of the FileCache functionality that is used within the 
> distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6283) Enable to clear a given file cache path

2017-04-09 Thread Zohar Mizrahi (JIRA)
Zohar Mizrahi created FLINK-6283:


 Summary: Enable to clear a given file cache path 
 Key: FLINK-6283
 URL: https://issues.apache.org/jira/browse/FLINK-6283
 Project: Flink
  Issue Type: Improvement
Reporter: Zohar Mizrahi
Assignee: Zohar Mizrahi
Priority: Minor


In the context of the FileCache functionality that is used within the 
distributed cache flow, add functionality to clear a given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6177) Add support for "Distributed Cache" in streaming applications

2017-03-23 Thread Zohar Mizrahi (JIRA)
Zohar Mizrahi created FLINK-6177:


 Summary: Add support for "Distributed Cache" in streaming 
applications
 Key: FLINK-6177
 URL: https://issues.apache.org/jira/browse/FLINK-6177
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: Zohar Mizrahi
Assignee: Zohar Mizrahi






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5886) Python API for streaming applications

2017-03-21 Thread Zohar Mizrahi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zohar Mizrahi updated FLINK-5886:
-
Description: 
A work in progress to provide python interface for Flink streaming APIs. The 
core technology is based on jython and thus imposes two limitations: a. user 
defined functions cannot use python extensions. b. the python version is 2.x

The branch is based on Flink release 1.2.0, as can be found here:
https://github.com/zohar-pm/flink/tree/python-streaming

In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was setup 
properly (see: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
 one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
which in return will execute all the tests under 
{{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}

  was:
A work in progress to provide python interface for Flink streaming APIs. The 
core technology is based on jython and thus imposes two limitations: a. user 
defined functions cannot use python extensions. b. the python version is 2.x

The branch is based on Flink release 1.2.0, as can be found here:
https://github.com/zohar-pm/flink/tree/python-streaming

In order to test it, someone can run 
`flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/jython/PythonStreamBinder.java`
 from IntelliJ IDE (assuming IntelliJ IDE is configured properly).


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>    Reporter: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5992) Enable file registration at distributed cache in stream execution environment

2017-03-08 Thread Zohar Mizrahi (JIRA)
Zohar Mizrahi created FLINK-5992:


 Summary: Enable file registration at distributed cache in stream 
execution environment
 Key: FLINK-5992
 URL: https://issues.apache.org/jira/browse/FLINK-5992
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Zohar Mizrahi


Create new API in stream execution environment to enable file registration at 
the distributed cache. This file will be accessible from any user-defined 
function in the (distributed) runtime under the given path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5886) Python API for streaming applications

2017-02-22 Thread Zohar Mizrahi (JIRA)
Zohar Mizrahi created FLINK-5886:


 Summary: Python API for streaming applications
 Key: FLINK-5886
 URL: https://issues.apache.org/jira/browse/FLINK-5886
 Project: Flink
  Issue Type: New Feature
  Components: Python API
Reporter: Zohar Mizrahi


A work in progress to provide python interface for Flink streaming APIs. The 
core technology is based on jython and thus imposes two limitations: a. user 
defined functions cannot use python extensions. b. the python version is 2.x

The branch is based on Flink release 1.2.0, as can be found here:
https://github.com/zohar-pm/flink/tree/python-streaming

In order to test it, someone can run 
`flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/jython/PythonStreamBinder.java`
 from IntelliJ IDE (assuming IntelliJ IDE is configured properly).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)