[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368277#comment-16368277
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3838


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
> Fix For: 1.5.0
>
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368278#comment-16368278
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5333


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
> Fix For: 1.5.0
>
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368232#comment-16368232
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
merging.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337542#comment-16337542
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5333#discussion_r163537435
  
--- Diff: flink-dist/src/main/assemblies/bin.xml ---
@@ -28,7 +28,7 @@ under the License.
true
flink-${project.version}
 
-   
+   
--- End diff --

this should be reverted


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337442#comment-16337442
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user meyer-net commented on the issue:

https://github.com/apache/flink/pull/3838
  
is the version of 1.5 supports python api for streaming applications?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337386#comment-16337386
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
Yup, running a modified script works properly without conflicts.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334456#comment-16334456
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
Given that each task has a separate jython instance (separate by a 
classloader) there shouldn't be any conflicts though.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334453#comment-16334453
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
I'll double check to make sure it works.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334448#comment-16334448
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/5333
  
Just started to look at your changes and have one comment with respect to 
the plan.py - have you tried executing the same script twice, but on the second 
time change one line in the script (.e.g a map function)? Make sure the change 
takes place for the second run.
(BTW - I'm not sure I'll be able to spend much time in the near future in 
reviewing the whole changes)


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334353#comment-16334353
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
@zohar-mizrahi FYI


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326332#comment-16326332
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
Here's a preliminary changelog:

General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version 
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile

API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as 
they are redundant due to environment factory methods

Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to 
SerialVersionOverridingPythonObjectInputStream

Functions:
- Introduced AbstractPythonUDF class for sharing 
RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during 
initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing

- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns 
something that isn't a PyObject

PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the 
PythonInterpreter

Environment:
- Reworked static environment factory methods from 
PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using 
static fields
- removed PythonEnvironmentConfig

Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the 
condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with 
PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file 
location instead


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326327#comment-16326327
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I've been digging into this for the past week. I found a number of things 
to improve and did so in a local branch. Once I've finalized/tests things 
(probably tomorrow) I'll link the branch here or open another PR.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145095#comment-16145095
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
There is none that I'm aware of. It is also possible for the JM and TM to 
run in the same JVM, say for tests or in local mode.

I can't think of a nice way to solve this, so I suggest we simply disable 
the check for the PythonEnvironmentConfig class.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139977#comment-16139977
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Referring to the issue with the ```PythonEnvironmentConfig ``` above, Is 
there any other global indication that I can use to test whether a given 
function is executed on the TaskManager?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136571#comment-16136571
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
One of the critical attributes is 
```PythonEnvironmentConfig::pythonTmpCachePath```, which is used in the 
following places: 
- ```PythonStreamExecutionEnvironment::execute:362```
- ```PythonStreamExecutionEnvironment::execute:400```
- ```PythonStreamBinder::prepareFiles:117```

On the client side, the temporary files are prepared for distribution by 
the ```PythonStreamBinder``` and then processed by the 
``PythonStreamExecutionEnvironment::execute``` function, which is called 
from the Python script. When the python script is executed on the TaskManager, 
this attribute remains ```null``` and thus, the ```execute``` returns 
immediately.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136527#comment-16136527
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The only usage i found is in 
`UtilityFunctions#smartFunctionDeserialization`, which is only called from 
various java UDF classes. Unless there is another usage hidden somewhere i 
would suggest to add a `PythonEnvironmentConfig` argument to the 
`smartFunctionDeserialization` method, and all UDF classes.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135429#comment-16135429
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
The thing is that I use the 
```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver 
information from the ```PythonStreamBinder``` to a class that is called from 
the python script. 
How would you suggest to do it otherwise?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135030#comment-16135030
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The check failed because the spotbugs plugin found something; this plugin 
isn't run by default when calling `mvn verify`. You can run the spotbugs 
locally by adding `-Dspotbugs` to the maven invocation.

The found problem is the PythonEnvironmentConfig class, which contains 
public static non-final fields. I propose making these non-static and 
explicitly pass around a config object where needed.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135018#comment-16135018
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm trying to track down the root cause for the checks failures without a 
success. Obviously, the given project (flink-libraries/flink-streaming-python) 
in master branch passes the `verify` with success in my environment.

Please advise,


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128767#comment-16128767
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Regarding the exception - 
```java.io.IOException: java.io.IOException: The given HDFS file URI ...```

In general, using the python interface requires a valid configuration of 
shared file system (.e.g HDFS), which designed to distribute the python files. 
Someone can bypass this issue by set the second argument to 'True' when 
calling to ```env.execute(...)``` in the python script.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128500#comment-16128500
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133392462
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

My bad - I missed the java unchecked exceptions part (.e.g runtime 
exception). It'll be much better to use it here. 
As for the `read`, we can either return `null` or again use the runtime 
exception.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127236#comment-16127236
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133192783
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that 
the record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector {
+   private Collector collector;
+
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   @Override
+   public void collect(PyObject record) {
+   PyObject po = UtilityFunctions.adapt(record);
--- End diff --

Actually you're right. This line can be dropped. And now, it seems that the 
`PythonCollector` is redundant, though it provides a more safety layer to 
report users about casting problems in case they provide UDF from the Java 
source code. If their function would not handle PyObject that it will break in 
runtime, saying something like and object cannot be cast to 'PyObject'. 


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127114#comment-16127114
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133171218
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

ObjectOutputStreams do not fail with an IOException if the object is null,


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127110#comment-16127110
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133170150
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Done.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127108#comment-16127108
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169625
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
+   int len = input.readInt();
+   byte[] serPo = new byte[len];
+   input.read(serPo);
+   PyObject po = null;
+   try {
+   po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes. Same as the answer above. In order to verify it, during debugging, I 
set the returned value to null and it continued without issues.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127103#comment-16127103
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133169331
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

Yes, indeed. According to 
https://github.com/EsotericSoftware/kryo#serializers: "By default, serializers 
do not need to handle the object being null."
But anyway, I added a print message to the log. 


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127050#comment-16127050
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133159049
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

I would throw a RuntimeException to fail the job. If the function can't be 
deserialized something went terribly wrong during deployment.

Returning null would cause an NPE later on in `DirectedOutput`, obfuscating 
what actually caused the error.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127033#comment-16127033
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133156452
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

The thing is that the signature of the `select` function in 
`OutputSelector` interface is without any exception throw declaration. This is 
why I catch the given exceptions here and add a check in the following lines to 
test whether the variable `this.fun` is null. If it is `null`, then the 
function returns null. How can I do it otherwise?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127020#comment-16127020
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133154903
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

Instead I've just provided a simplified version of it. Will send for review.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126899#comment-16126899
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r133132675
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125762#comment-16125762
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132967578
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+   public PythonObjectInputStream2(InputStream in) throws IOException {
+   super(in);
+   }
+
+   protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+   ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+   Class localClass;
+   try {
+   localClass = resolveClass(resultClassDescriptor);
+   } catch (ClassNotFoundException e) {
+   System.out.println("No local class for " + 
resultClassDescriptor.getName());
--- End diff --

The purpose of this function is to try to return the class according to the 
given description. If it fails, it probably means that the Jython interpreter 
was not initialised yet, and as a result it is initialised. This is handled in 
`org.apache.flink.streaming.python.api.functions.UtilityFunctions::smartFunctionDeserialization`.

I'm currently checking whether the `catch` here is redundant and it's a 
left-over from the debugging phase. We can probably let the exception be 
propagated up the call stack.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124952#comment-16124952
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132843982
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124928#comment-16124928
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132842489
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

I agree that the function names here are a bit confusing - in essence this 
function locates a single test file, while the function in the next code line 
`getFilesInFolder` collects files that start with `test_`, thus the main test 
file `run_all_tests.py` will be filtered and not be included. 
So, in order to be more readable and robust, I changed the 
`getFilesInFolder` to receive one more argument of `excludes` and call it with 
the main test file in the `excludes` argument.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124914#comment-16124914
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132841381
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

Done for the Scala version. 

As for not being able to run the tests:
1. I fixed the issue with the ```TypeError: object of type 
'java.lang.Class' has no len()```.
2. I still can't reproduce the main issue, concerning an import of java 
class from the Python module:
File: 
```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py```
Line:19: ```from org.apache.flink.api.java.utils import ParameterTool```

The given class (ParameterTool) resides in different project `flink-java` 
and the jython module cannot find it. Probably, It somehow concerns the 
CLASSPATH. 

Any suggestion for how to reproduce it?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049018#comment-16049018
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
When running the example against a local cluster i got this exception
```
java.io.IOException: java.io.IOException: The given HDFS file URI 
(hdfs:/tmp/flink_cache_-4117839671387669278) did not describe the HDFS 
NameNode. The attempt to use a default HDFS configuration, as specified in the 
'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the 
following problem: Either no default file system was registered, or the 
provided configuration contains no valid authority component (fs.default.name 
or fs.defaultFS) describing the (hdfs namenode) host and port.
```


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048967#comment-16048967
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888394
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
--- End diff --

this import can be removed.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048970#comment-16048970
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889539
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
+   List filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
+
+   String[] args = new String[filesInTestPath.size() + 1];
+   args[0] = testFullPath.getAbsolutePath();
+
+   for (final ListIterator it = 
filesInTestPath.listIterator(); it.hasNext();) {
+   final String p = it.next();
+   args[it.previousIndex() + 1] = p;
+   }
+   return args;
+   }
+
+   private static File findStreamTestFile(String name) throws Exception {
+   if (new File(name).exists()) {
+   return new File(name);
+   }
+   FileSystem fs = FileSystem.getLocalFileSystem();
+   String workingDir = fs.getWorkingDirectory().getPath();
+   if (!workingDir.endsWith(flinkPythonRltvPath)) {
+   workingDir += File.separator + flinkPythonRltvPath;
+   }
+   FileStatus[] status = fs.listStatus(
+   new Path( workingDir + File.separator + 
pathToStreamingTests));
+   for (FileStatus f : status) {
+   String file_name = f.getPath().getName();
--- End diff --

should be named `fileName`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048956#comment-16048956
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121887994
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

replace `2.10` with `${scala.binary.version`


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048965#comment-16048965
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888454
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
--- End diff --

These modifiers should be re-ordered to `private static final`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048960#comment-16048960
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121898935
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonReduceFunction;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+
+/**
+ * A thin wrapper layer over {@link KeyedStream}.
+ *
+ * A {@code PythonKeyedStream} represents a {@link PythonDataStream} on 
which operator state is
+ * partitioned by key using a provided {@link 
org.apache.flink.api.java.functions.KeySelector;}
+ */
+@Public
+public class PythonKeyedStream extends 
PythonDataStream> {
+
+   public PythonKeyedStream(KeyedStream stream) {
--- End diff --

we can remove the public modifier; this applied to other variations of 
`PythonDataStream` as well, exception `PythonDataStream` itself.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048975#comment-16048975
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121890065
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

Since we include this in the list of scripts to run, does that mean that we 
run all tests twice?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048974#comment-16048974
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121901070
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A Python deserialization schema, which implements {@link 
DeserializationSchema}. It converts a
+ * serialized form of {@code PyObject} into its Java Object representation.
+ */
+public class PythonDeserializationSchema implements 
DeserializationSchema {
+   private static final long serialVersionUID = -9180596504893036458L;
+   private final TypeInformation resultType = 
TypeInformation.of(new TypeHint(){});
+
+   private final byte[] serSchema;
+   private transient DeserializationSchema schema;
+
+   public PythonDeserializationSchema(DeserializationSchema 
schema) throws IOException {
+   this.serSchema = SerializationUtils.serializeObject(schema);
+   }
+
+   @SuppressWarnings("unchecked")
+   public Object deserialize(byte[] message) throws IOException {
+   if (this.schema == null) {
+   try {
+   this.schema = (DeserializationSchema) 
SerializationUtils.deserializeObject(this.serSchema);
+   } catch (ClassNotFoundException e) {
+   e.printStackTrace();
--- End diff --

we should fail right here as otherwise he get an NPE later on.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048976#comment-16048976
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121894570
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048971#comment-16048971
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121891478
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048963#comment-16048963
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121901283
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that 
the record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector {
+   private Collector collector;
+
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   @Override
+   public void collect(PyObject record) {
+   PyObject po = UtilityFunctions.adapt(record);
--- End diff --

AM i missing something or is this call unnecessary. Isn't it guaranteed 
that `record instanceof PyObject` is `true`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048977#comment-16048977
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121891397
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048957#comment-16048957
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900689
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
--- End diff --

remove space after `read`


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048962#comment-16048962
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121893907
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048955#comment-16048955
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900122
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import 
org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python 
UDF {@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it 
internally in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, 
then it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * This function is used internally by the Python thin wrapper layer 
over the streaming data
+ * functionality
+ */
+public class PythonOutputSelector implements OutputSelector {
+   private static final long serialVersionUID = 909266346633598177L;
+
+   private final byte[] serFun;
+   private transient OutputSelector fun;
+
+   public PythonOutputSelector(OutputSelector fun) throws 
IOException {
+   this.serFun = SerializationUtils.serializeObject(fun);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public Iterable select(PyObject value) {
+   if (this.fun == null) {
+   try {
+   this.fun = (OutputSelector) 
SerializationUtils.deserializeObject(this.serFun);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

we should fail here, and in case of a `ClassNotFoundException`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048973#comment-16048973
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900761
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

is it intended to silently continue here?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048972#comment-16048972
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121899652
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+   public PythonObjectInputStream2(InputStream in) throws IOException {
+   super(in);
+   }
+
+   protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+   ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+   Class localClass;
+   try {
+   localClass = resolveClass(resultClassDescriptor);
+   } catch (ClassNotFoundException e) {
+   System.out.println("No local class for " + 
resultClassDescriptor.getName());
--- End diff --

Remove `println`. If an exception is thrown, what exactly does that mean to 
the job execution? I.e., why don't we fail here completely?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048958#comment-16048958
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889286
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
+   List filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
+
+   String[] args = new String[filesInTestPath.size() + 1];
+   args[0] = testFullPath.getAbsolutePath();
+
+   for (final ListIterator it = 
filesInTestPath.listIterator(); it.hasNext();) {
+   final String p = it.next();
+   args[it.previousIndex() + 1] = p;
+   }
+   return args;
+   }
+
+   private static File findStreamTestFile(String name) throws Exception {
+   if (new File(name).exists()) {
+   return new File(name);
+   }
+   FileSystem fs = FileSystem.getLocalFileSystem();
+   String workingDir = fs.getWorkingDirectory().getPath();
+   if (!workingDir.endsWith(flinkPythonRltvPath)) {
+   workingDir += File.separator + flinkPythonRltvPath;
+   }
+   FileStatus[] status = fs.listStatus(
+   new Path( workingDir + File.separator + 
pathToStreamingTests));
+   for (FileStatus f : status) {
+   String file_name = f.getPath().getName();
+   if (file_name.equals(name)) {
+   return new File(f.getPath().getPath());
+   }
+   }
+   throw new FileNotFoundException();
+   }
+
+   private static List getFilesInFolder(String path) {
--- End diff --

I would rename this to `getTestFilesInFolder` and change the signature to 
accept a `Fileยด and return a `List`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048961#comment-16048961
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888620
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
--- End diff --

you should call `main` directly without referencing `this`, otherwise one 
would assume that is a non-static method.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048966#comment-16048966
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121888058
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
+   1.2.0
--- End diff --

replace version with `${project.version}`


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048968#comment-16048968
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900972
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+/**
+ * A Python serialization schema, which implements {@link 
SerializationSchema}. It converts
+ * a {@code PyObject} into its serialized form.
+ */
+public class PythonSerializationSchema implements 
SerializationSchema {
+   private static final long serialVersionUID = -9170596504893036458L;
+
+   private final byte[] serSchema;
+   private transient SerializationSchema schema;
+
+   public PythonSerializationSchema(SerializationSchema schema) 
throws IOException {
+   this.serSchema = SerializationUtils.serializeObject(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(PyObject element) {
+   if (this.schema == null) {
+   try {
+   this.schema = 
(SerializationSchema)SerializationUtils.deserializeObject(this.serSchema);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

we should fail right here, as otherwise we get an NPE later on anyway.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048959#comment-16048959
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900800
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
+   try {
+   byte[] serPo = SerializationUtils.serializeObject(po);
+   output.writeInt(serPo.length);
+   output.write(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
+   }
+   }
+
+   public PyObject read (Kryo kryo, Input input, Class type) {
+   int len = input.readInt();
+   byte[] serPo = new byte[len];
+   input.read(serPo);
+   PyObject po = null;
+   try {
+   po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+   } catch (IOException e) {
+   e.printStackTrace();
--- End diff --

is it intended to silently continue here?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048969#comment-16048969
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121900659
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the 
Kryo serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer {
+
+   public void write (Kryo kryo, Output output, PyObject po) {
--- End diff --

remove space after `write`


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048964#comment-16048964
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r121889042
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
--- End diff --

by convention static final fields are upper case, as in 
`DEFAULT_PYTHON_SCRIPT_NAME`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048929#comment-16048929
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I can't seem to get the tests running.

In the IDE i get this exception:
```
null
Traceback (most recent call last):
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\plan.py",
 line 3, in 
run_all_testsae1bf92fc871d56dae4598b332a87804.main()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 71, in main
Main().run()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 45, in run
tests.append(__import__(test_module_name, globals(), locals()))
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\test_filter.py",
 line 25, in 
from utils.python_test_base import TestBase
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\utils\python_test_base.py",
 line 19, in 
from org.apache.flink.api.java.utils import ParameterTool
java.lang.NoClassDefFoundError: org/apache/flink/api/java/utils (wrong 
name: org/apache/flink/api/java/Utils)
at java.lang.ClassLoader.defineClass1(Native Method)
```

On the command-line the tests do run, but fail with this exception:
```
Submitting job ... 'test_filter'
Get execution environment
2> (50, u'hello')
null
Traceback (most recent call last):
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\plan.py",
 line 3, in 
run_all_testsae1bf92fc871d56dae4598b332a87804.main()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 71, in main
Main().run()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 59, in run
print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, 
'#'*len(ex_type)))
TypeError: object of type 'java.lang.Class' has no len()
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 17.835 sec 
<<< FAILURE! - in org.apache.flink.streaming.python.api.PythonStreamBinderTest
testJob(org.apache.flink.streaming.python.api.PythonStreamBinderTest)  Time 
elapsed: 17.393 sec  <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: null
```


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048856#comment-16048856
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm gonna take a deeper look now, and play around with it a bit.

Be aware that if you rebase the branch again you will hit a myriad of 
checkstyle violations, we will have to suppress the checks for method names for 
this module.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020764#comment-16020764
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117917812
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

Otherwise we include them in the jar-with-dependencies and have them 
multiple times on the classpath.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020762#comment-16020762
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117917576
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

argh, let me try again: They should be marked as `provided`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020414#comment-16020414
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
In the last change, I've rebased locally on top of origin/master, so I did 
`git push -f` to the master branch in my fork.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020364#comment-16020364
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117864592
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

I'm not sure what you mean. Please explain.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019916#comment-16019916
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117807878
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) {
//  Utilities
// 

 
+   /**
+* Remove a given path recursively if exists. The path can be a 
filepath or directory.
+*
+* @param path  The root path to remove.
+* @throws IOException
+* @throws URISyntaxException
+*/
+   public static void clearPath(String path) throws IOException, 
URISyntaxException {
--- End diff --

My bad! I don't use it anymore.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019906#comment-16019906
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117806756
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019874#comment-16019874
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117802052
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

Ah didn't see that; figured it was a leftover from the development.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019811#comment-16019811
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117793843
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019776#comment-16019776
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117788892
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

How would you suggest to call it? It extends the `PythonObjectInputStream`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019497#comment-16019497
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724977
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019494#comment-16019494
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724247
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
+public class PythonDataStream> {
+   protected final D stream;
+
+   public PythonDataStream(D stream) {
+   this.stream = stream;
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#union(DataStream[])}.
+*
+* @param streams
+*The Python DataStreams to union output with.
+* @return The {@link PythonDataStream}.
+*/
+   @SafeVarargs
+   @SuppressWarnings("unchecked")
+   public final PythonDataStream union(PythonDataStream... streams) {
+   ArrayList dsList = new ArrayList<>();
+   for (PythonDataStream ps : streams) {
+   dsList.add(ps.stream);
+   }
+   DataStream[] dsArray = new DataStream[dsList.size()];
+   return new 
PythonDataStream(stream.union(dsList.toArray(dsArray)));
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#split(OutputSelector)}.
+*
+* @param output_selector
+*The user defined {@link OutputSelector} for directing the 
tuples.
+* @return The {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019498#comment-16019498
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723487
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,99 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_2.10
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
--- End diff --

all flink dependencies should be marked as `ยด.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019484#comment-16019484
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117722682
  
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+
+
+Analysis streaming programs in Flink are regular programs that implement 
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or 
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming 
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to 
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as 
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---
+Flink Python streaming API uses Jython framework (see 
)
+to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+ Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-
+The following streaming program is a complete, working example of 
WordCount. You can copy  paste the code
+to run it locally (see notes later in this section). It counts the number 
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints 
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import 
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+def __init__(self, num_iters):
+self._running = True
+self._num_iters = num_iters
+
+def run(self, ctx):
+counter = 0
+while self._running and counter < self._num_iters:
+ctx.collect('Hello World')
+counter += 1
+
+def cancel(self):
+self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+for word in value.lower().split():
+collector.collect((1, word))
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, word1 = input1
+count2, word2 = input2
+return (count1 + count2, word1)
+
+def main():
+env = PythonStreamExecutionEnvironment.get_execution_environment()
+env.create_python_source(Generator(num_iters=1000)) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(50)) \
+.reduce(Sum()) \
+.print()
+env.execute()
+
+
+if __name__ == '__main__':
+main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line 
in the `main()` function
+  with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage, 
which needs to be configured (.e.g HDFS)
+  upfront.
+- The output from of the given script is directed to the standard output. 
Consequently, the output
+  is written to the corresponding worker `.out` filed. If the script is 
executed inside the IntelliJ IDE,
--- End diff --

type: filed -> file


> Python API for streaming applications
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019486#comment-16019486
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723303
  
--- Diff: 
flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 ---
@@ -16,12 +16,12 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 

-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
+from flink.functions.Aggregation import Max, Min, Sum
+from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
-from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.functions.Aggregation import Max, Min, Sum
+from flink.functions.MapFunction import MapFunction
+from flink.plan.Environment import get_environment
--- End diff --

should be reverted


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019496#comment-16019496
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724239
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
+public class PythonDataStream> {
+   protected final D stream;
+
+   public PythonDataStream(D stream) {
+   this.stream = stream;
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#union(DataStream[])}.
+*
+* @param streams
+*The Python DataStreams to union output with.
+* @return The {@link PythonDataStream}.
+*/
+   @SafeVarargs
+   @SuppressWarnings("unchecked")
+   public final PythonDataStream union(PythonDataStream... streams) {
+   ArrayList dsList = new ArrayList<>();
+   for (PythonDataStream ps : streams) {
+   dsList.add(ps.stream);
+   }
+   DataStream[] dsArray = new DataStream[dsList.size()];
+   return new 
PythonDataStream(stream.union(dsList.toArray(dsArray)));
+   }
+
+   /**
+* A thin wrapper layer over {@link DataStream#split(OutputSelector)}.
+*
+* @param output_selector
+*The user defined {@link OutputSelector} for directing the 
tuples.
+* @return The {@link 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019500#comment-16019500
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724828
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
--- End diff --

static fields should be above non-static fields.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019495#comment-16019495
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117725620
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019492#comment-16019492
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724857
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
--- End diff --

mark as `@PublicEvolving`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019490#comment-16019490
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117722927
  
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+
+
+Analysis streaming programs in Flink are regular programs that implement 
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or 
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming 
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to 
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as 
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---
+Flink Python streaming API uses Jython framework (see 
)
+to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+ Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-
+The following streaming program is a complete, working example of 
WordCount. You can copy  paste the code
+to run it locally (see notes later in this section). It counts the number 
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints 
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import 
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+def __init__(self, num_iters):
+self._running = True
+self._num_iters = num_iters
+
+def run(self, ctx):
+counter = 0
+while self._running and counter < self._num_iters:
+ctx.collect('Hello World')
+counter += 1
+
+def cancel(self):
+self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+for word in value.lower().split():
+collector.collect((1, word))
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, word1 = input1
+count2, word2 = input2
+return (count1 + count2, word1)
+
+def main():
+env = PythonStreamExecutionEnvironment.get_execution_environment()
+env.create_python_source(Generator(num_iters=1000)) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(50)) \
+.reduce(Sum()) \
+.print()
+env.execute()
+
+
+if __name__ == '__main__':
+main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line 
in the `main()` function
+  with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage, 
which needs to be configured (.e.g HDFS)
+  upfront.
+- The output from of the given script is directed to the standard output. 
Consequently, the output
+  is written to the corresponding worker `.out` filed. If the script is 
executed inside the IntelliJ IDE,
+  then the output will be displayed in the console tab.
+
+{% top %}
+
+Program 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019485#comment-16019485
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723282
  
--- Diff: 
flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 ---
@@ -15,18 +15,18 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 

-from flink.plan.Environment import get_environment
-from flink.functions.MapFunction import MapFunction
-from flink.functions.FlatMapFunction import FlatMapFunction
+import struct
 from flink.functions.FilterFunction import FilterFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.MapFunction import MapFunction
--- End diff --

should be reverted.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019488#comment-16019488
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724604
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in 
a python user-defined
+ * function (UDF).
+ * The fact the this field is not set, results in a dynamic calculation 
of this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python 
class inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class 
is created. Its name is
+ * constructed using the following pattern:
+ * {@code 
org.python.proxies.$$}. The {@code 
}
+ * part is increased by one in runtime, for every job submission. It 
results in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
--- End diff --

remove ยด2` from name.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019489#comment-16019489
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117725288
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@Public
+public class PythonStreamExecutionEnvironment {
+   private final StreamExecutionEnvironment env;
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new PythonStreamExecutionEnvironment();
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019493#comment-16019493
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117726359
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) {
//  Utilities
// 

 
+   /**
+* Remove a given path recursively if exists. The path can be a 
filepath or directory.
+*
+* @param path  The root path to remove.
+* @throws IOException
+* @throws URISyntaxException
+*/
+   public static void clearPath(String path) throws IOException, 
URISyntaxException {
--- End diff --

I thought we decided against adding this method?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019491#comment-16019491
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117724303
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import 
org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link 
DataStream}, which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * 
+ * {@link PythonDataStream#map}
+ * {@link PythonDataStream#split}
+ * 
+ *
+ * A thin wrapper layer means that the functionality itself is 
performed by the
+ * {@link DataStream}, however instead of working directly with the 
streaming data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.
+ */
+@Public
--- End diff --

I would make the entire class as `@PublicEvolving`.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019487#comment-16019487
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117723223
  
--- Diff: flink-libraries/flink-python/pom.xml ---
@@ -59,20 +59,32 @@ under the License.
 org.apache.flink
 flink-core
 ${project.version}
-   provided
 
 
 org.apache.flink
 flink-java
 ${project.version}
-   provided
 
+   
+   org.apache.flink
--- End diff --

These changes should be reverted i guess.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019499#comment-16019499
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r117726281
  
--- Diff: flink-libraries/pom.xml ---
@@ -52,7 +53,7 @@ under the License.

org.slf4j
slf4j-api
-   provided
--- End diff --

This should be reverted.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002644#comment-16002644
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
It may take a while until i can review this; the 1.3 feature freeze just 
kicked in and it's time to test all the new features in depth.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000751#comment-16000751
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi closed the pull request at:

https://github.com/apache/flink/pull/3826


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000437#comment-16000437
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3826
  
Please close this pull request.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999873#comment-15999873
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3838

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3838


commit c1333c3424897caa615683d3494b41e7ab88d45d
Author: Zohar Mizrahi 
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996958#comment-15996958
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi closed the pull request at:

https://github.com/apache/flink/pull/3827


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996955#comment-15996955
 ] 

Chesnay Schepler commented on FLINK-5886:
-

I was referring to PR #3826 that you opened. (On GitHub "#ABCD" automatically 
generates a link to a PR)

> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread Zohar Mizrahi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996944#comment-15996944
 ] 

Zohar Mizrahi commented on FLINK-5886:
--

No problem - I'll rebase on top of master.

As for #3826 - maybe you referred to another ticket, because I'm not familiar 
with this one.

> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996762#comment-15996762
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3827
  
The distributed cache functionality was just merged, could you rebase this 
PR on top of the current master?

Also, don't forget to close #3826 :)


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996749#comment-15996749
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3827

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink python-streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3827.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3827


commit 03684e8e460143013babb2ec88c66c8fa1119c43
Author: Zohar Mizrahi 
Date:   2017-04-09T09:11:57Z

[FLINK-6177] Add support for "Distributed Cache" in streaming applications

commit 7e6374f13a1846b6982923083ee98140c37d5903
Author: Zohar Mizrahi 
Date:   2017-04-20T08:19:38Z

[FLINK-6177] Apply suggested fixes from a review

commit 8606fafff92bbcea48c64a6accf87ec2b6802b46
Author: Zohar Mizrahi 
Date:   2017-04-20T16:38:49Z

[FLINK-6177] Combine the streaming & batch distributed cache tests and 
inherit from StreamingMultipleProgramsTestBase

commit fa74b64d100e51f4a8776b613714581c585b1ddd
Author: Zohar Mizrahi 
Date:   2017-05-01T13:07:14Z

[FLINK-6177] Add missing file cache functions to the scala 
StreamExecutionEnvironment

commit cbbd86b59a4a5561fd383a704bc95c5c1c255449
Author: Zohar Mizrahi 
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996746#comment-15996746
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3826

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink python-streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3826.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3826


commit 7d1b9f0bb2dea719dc438cb8649f200cc6235980
Author: Robert Metzger 
Date:   2017-01-03T09:05:54Z

[FLINK-4861][hotfix] Fix change-scala-version script for opt assembly

commit a6a5b21ef8c7fdd7d073296208f47d47ca6a
Author: Sachin 
Date:   2017-01-03T10:28:11Z

[FLINK-5382][web-frontend] Fix problems with downloading TM logs on Yarn

commit 335175e6eefc260cf1600544639594d85836f7d8
Author: Ivan Mushketyk 
Date:   2016-12-16T07:56:46Z

[FLINK-5349] [docs] Fix typos in Twitter connector example
This closes #3015.

commit bb46fffe310f9cd6f667293df14d98e90011d591
Author: Abhishek R. Singh 
Date:   2016-12-14T14:05:11Z

[FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

THis closes #3006.

commit 24109cb2692f1f0dd2b9f8c9c8dcc02e55148bab
Author: zentol 
Date:   2016-11-25T12:27:43Z

[FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

This closes #2887.

commit b50bbcc8853c1c2ebcdba9c74a70bfdfbe6557ab
Author: Boris Osipov 
Date:   2016-12-16T07:30:33Z

[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

This closes #3019.

commit 9c0c19aae5b78de71c91a735a76cd9196dc8482c
Author: zentol 
Date:   2016-11-25T11:51:38Z

[FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on 
Windows

This closes #2888.

commit 91f9a1acaa899159a0d907528634bd246e6854b4
Author: Stephan Ewen 
Date:   2017-01-04T23:18:13Z

[FLINK-5408] [RocksDB backend] Uniquify RocksDB JNI library path to avoid 
multiple classloader problem

commit fb48c3b4cbc5a186cb7b812c8d05833c5852b385
Author: Stephan Ewen 
Date:   2017-01-05T13:44:00Z

[FLINK-4890] [core] Make GlobFilePathFilter work on Windows

commit 3554c96d118a411906a22b1f1087de073617a4c7
Author: Stefan Richter 
Date:   2016-12-28T11:50:00Z

[FLINK-5397] [runtime] Do not replace ObjectStreamClass on deserialization 
of migration package classes, override resolveClass(...) instead

This closes #3050

commit 700cbd464345e9c180cfef58a4082b2e39d27160
Author: Stephan Ewen 
Date:   2017-01-05T16:30:37Z

[hotfix] Set default test logger back to 'OFF' in 'flink-tests'

commit 65a32e74175c026f04ab058ee2ace8c9e7012d76
Author: zentol 
Date:   2017-01-05T14:39:25Z

[FLINK-5160] SecurityUtils use OperatingSystem.getCurrentOperatingSystem()

commit aaf9612791284633727a0951d0d45292ef5e233c
Author: zentol 
Date:   2017-01-05T15:18:38Z

[FLINK-5412] Enable RocksDB tests on Windows OS

commit 9d99f2bd4a29b748905e55a774ff04f933b6b00f
Author: Alexey Diomin 
Date:   2016-07-04T15:13:11Z

[FLINK-4148] Fix min distance calculation in QuadTree

commit b93f80afc7c50c7fefc850b620bb571523343595
Author: Sachin Goel 
Date: