[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206367839
  
Thanks @nikste for implementing!

@StefanPapp We would be delighted if you tried out the new shell!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1412


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206352670
  
Adjusted the welcome message now 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206351430
  
Merging this with an adjustment of the welcome message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread StefanPapp
Github user StefanPapp commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206351050
  
I also would like to have this feature. Flink is more and more perceived as 
streaming framework that can do batch too if necessary. So having a batch 
execution environment and no stream execution environment contradicts the 
message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206350336
  
Ah, right @rmetzger. We can go ahead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206349631
  
@chiwanpark: I think you can register multiple email addresses with GitHub, 
so that they can associate all your commits (from different addresses) to you.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206345431
  
I just found the author email is not same as @nikste's github account. It 
is trivial but fixing email address would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206344388
  
Looks good to me. +1

I tested a simple streaming word count example with socket stream in 
following configuration:
* Flink on Hadoop YARN 2.7.2 (both Scala 2.10, 2.11)
* Flink local cluster (both Scala 2.10, 2.11)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206343969
  
+1 to merge although I still think `senv` and `benv` will confuse people. 

One more thing before merging, we need to change the initial welcome 
message which explains the usage. It still states: 
```
NOTE: Use the prebound Execution Environment "env" to read data and execute 
your program:
  * env.readTextFile("/path/to/data")
  * env.execute("Program name")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206339995
  
I think the changes look good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205285378
  
As you wish, I don't mind. I only felt it was hard to read and to keep 
apart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205274226
  
I prefer `senv` and `benv` because it is shorter than `streamEnv` and 
`batchEnv`. In shell, shorter variables would be better even though there is a 
auto-complete support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205271252
  
I was following @StephanEwen 's suggestion above here:
> The Scala community seems to like brevity (senv vs. streamEnv).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205267364
  
How about renaming `benv` to `batchEnv` and `senv` to `streamEnv`? Makes 
the two more distinct from each other and the names more self-explanatory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205234801
  
@mxm rebased to master and added documentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205204935
  
@nikste Could you rebase to the latest master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r58346629
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---
@@ -124,7 +127,9 @@ class FlinkILoop(
 "org.apache.flink.api.java.operators._",
 "org.apache.flink.api.java.sampling._",
 "org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
+"org.apache.flink.api.scala.utils._",
+"org.apache.flink.streaming.api.scala._",
+"org.apache.flink.streaming.api.windowing.time._"
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r58094009
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---
@@ -124,7 +127,9 @@ class FlinkILoop(
 "org.apache.flink.api.java.operators._",
 "org.apache.flink.api.java.sampling._",
 "org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
+"org.apache.flink.api.scala.utils._",
+"org.apache.flink.streaming.api.scala._",
+"org.apache.flink.streaming.api.windowing.time._"
--- End diff --

I've added `streaming.api.scala` and `windowing.time` any other default 
imports important for streaming?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-204038110
  
So since a lot has changed in the master I basically rewrote the whole 
thing. Seems to work now.

What default imports should be included for the streaming API? 

Also I noticed, that the commit history of the files is lost after moving 
the module from staging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread rawkintrevo
Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-203953994
  
@nikste awesome work, any updates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-02-15 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-184194014
  
@nikste Cool!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-02-15 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-184177781
  
@mxm @aljoscha i was on vacation for the last month, I'll continue working 
on this now..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-01-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-176847731
  
@nikste Any updates on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-01-18 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-172493786
  
The context environment is set in the `ScalaShellRemoteEnvironment` to 
disable creation of new environments by the user. This is done when you first 
create the batch environment. Then you create the 
`ScalaShellRemoteStreamEnvironment` which internally creates the 
`StreamRemoteEnvironment` which fails because the context environment is 
already set.

You may work around this by setting the context environment (and thereby 
disabling creation of other environments) **after** you have created both of 
your shell environments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-01-18 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-172494058
  
By the way, sorry for the late response. I was traveling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-21 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-166394233
  
Hey @mxm,
So I've tried to initialize the scala shell with one and two different 
miniclusters and initialize the streaming and batch environment simultaneously. 
However, for the streaming environment it throws following exception:

```java
Exception in thread "main" 
org.apache.flink.api.common.InvalidProgramException: The RemoteEnvironment 
cannot be used when submitting a program through a client, or running in a 
TestEnvironment context.
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.(RemoteStreamEnvironment.java:130)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.(RemoteStreamEnvironment.java:101)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.(RemoteStreamEnvironment.java:78)
at 
org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.(ScalaShellRemoteStreamEnvironment.java:58)
at org.apache.flink.api.scala.FlinkILoop.(FlinkILoop.scala:104)
at org.apache.flink.api.scala.FlinkILoop.(FlinkILoop.scala:61)
at 
org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:163)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:87)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
```
Any Idea how to get around this?
  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-08 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162843154
  
Ok, preferences here seem to be clear, I'll change it to make use of two 
environments simultaneously in the shell then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162886600
  
Great. Thank you @nikste!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-07 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162804155
  
@nikste, Yes that is what I mean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-07 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162528807
  
Hi @nikste. Pinging again :) What do you think about my suggestion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-07 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162556835
  
I am a bit biased towards having two environments, like @chiwanpark and 
@mxm suggest.

We could also keep calling the batch environment `env` and the stream 
environment `senv`. The Scala community seems to like brevity (senv vs. 
streamEnv).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-07 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162534231
  
Hey @mxm, my survey among two people was inconclusive. I can see both 
versions working, I'm more tending towards the version that is implemented now, 
I might be a bit biased though ;) 

@chiwanpark not sure what you are rooting for, you mean using 
```batchEnv``` and ```streamEnv``` inside the shell instead of startup 
parameters by "multiple environments during startup of the shell" ?'  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-07 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-162529724
  
I also prefer multiple environments during startup of the shell. I'm adding 
YARN session support of the shell. If there is another startup mode, 
implementation could be complex.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-12-02 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-161318473
  
Does it have to be an extra startup mode? How about you instantiating both 
`ExecutionEnvironment` and `StreamExecutionEnvironment` during startup of the 
shell? We could bind them to `batchEnv` and `streamEnv` respectively. I think 
it would be nice not having to figure out how which parameter to supply for the 
shell. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46155052
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by nikste on 8/12/15.
+ */
+public class ScalaShellRemoteStreamEnvironment extends 
RemoteStreamEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ScalaShellRemoteStreamEnvironment.class);
+   // reference to Scala Shell, for access to virtual directory
+   private FlinkILoop flinkILoop;
+   /**
+* Creates a new RemoteStreamEnvironment that points to the master
+* (JobManager) described by the given host name and port.
+*
+* @param host   The host name or address of the master (JobManager), 
where the
+*   program should be executed.
+* @param port   The port of the master (JobManager), where the program 
should
+*   be executed.
+* @param jarFiles The JAR files with code that needs to be shipped to 
the
+*   cluster. If the program uses 
user-defined functions,
+*   user-defined input formats, or any 
libraries, those must be
+*/
+   public ScalaShellRemoteStreamEnvironment(String host, int port, 
FlinkILoop flinkILoop, String... jarFiles) {
+   super(host, port, jarFiles);
+   this.flinkILoop = flinkILoop;
+   }
+   /**
+* compiles jars from files in the shell virtual directory on the fly, 
sends and executes it in the remote stream environment
+*
+* @return Result of the computation
+* @throws ProgramInvocationException
+*/
+   @Override
+   public JobExecutionResult execute() throws Exception {
+   prepareJars();
+   return(super.execute());
+   }
+   /**
+* prepares the user generated code from the shell to be shipped to 
JobManager
+* (i.e. save it into jarFiles of this object)
+*/
+   private void prepareJars() throws MalformedURLException {
+   String jarFile = 
flinkILoop.writeFilesToDisk().getAbsolutePath();
+
+   // get "external jars, and add the shell command jar, pass to 
executor
+   List alljars = new ArrayList();
+   // get external (library) jars
+   String[] extJars = this.flinkILoop.getExternalJars();
+
+   if(!ArrayUtils.isEmpty(extJars)) {
+   alljars.addAll(Arrays.asList(extJars));
+   }
+   // add shell commands
+   alljars.add(jarFile);
+   String[] alljarsArr = new String[alljars.size()];
+   alljarsArr = alljars.toArray(alljarsArr);
+   for (String jarF : alljarsArr) {
+   URL fileUrl = new URL("file://" + jarF);
+   System.out.println("sending:" + fileUrl);
+   try {
+   JobWithJars.checkJarFile(fileUrl);
+ 

[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-160659980
  
@rmetzger, thanks for the comments. 
Changed the code accordingly, also I've extended the documentation for the 
scala shell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46154962
  
--- Diff: flink-staging/flink-scala-shell/pom.xml ---
@@ -84,6 +108,11 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-connector-twitter
--- End diff --

I've removed them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46155833
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
 ---
@@ -95,24 +140,49 @@ class FlinkILoop(
 new File(tmpDirBase, "scala_shell_commands.jar")
   }
 
-  private val packageImports = Seq[String](
-"org.apache.flink.core.fs._",
-"org.apache.flink.core.fs.local._",
-"org.apache.flink.api.common.io._",
-"org.apache.flink.api.common.aggregators._",
-"org.apache.flink.api.common.accumulators._",
-"org.apache.flink.api.common.distributions._",
-"org.apache.flink.api.common.operators._",
-"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
-"org.apache.flink.api.common.functions._",
-"org.apache.flink.api.java.io._",
-"org.apache.flink.api.java.aggregation._",
-"org.apache.flink.api.java.functions._",
-"org.apache.flink.api.java.operators._",
-"org.apache.flink.api.java.sampling._",
-"org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
-  )
+  private val packageImports =
+streaming match {
+  case StreamingMode.BATCH_ONLY => Seq[String](
+  "org.apache.flink.core.fs._",
+  "org.apache.flink.core.fs.local._",
+  "org.apache.flink.api.common.io._",
+  "org.apache.flink.api.common.aggregators._",
+  "org.apache.flink.api.common.accumulators._",
+  "org.apache.flink.api.common.distributions._",
+  "org.apache.flink.api.common.operators._",
+  
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+  "org.apache.flink.api.common.functions._",
+  "org.apache.flink.api.java.io._",
+  "org.apache.flink.api.java.aggregation._",
+  "org.apache.flink.api.java.functions._",
+  "org.apache.flink.api.java.operators._",
+  "org.apache.flink.api.java.sampling._",
+  "org.apache.flink.api.scala._",
+  "org.apache.flink.api.scala.utils._"
+)
+  case StreamingMode.STREAMING => Seq[String](
+   "org.apache.flink.core.fs._",
+  "org.apache.flink.core.fs.local._",
+  "org.apache.flink.api.common.io._",
+  "org.apache.flink.api.common.aggregators._",
+  "org.apache.flink.api.common.accumulators._",
+  "org.apache.flink.api.common.distributions._",
+  "org.apache.flink.api.common.operators._",
+  
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+  "org.apache.flink.api.common.functions._",
+  "org.apache.flink.api.java.io._",
+  "org.apache.flink.api.java.aggregation._",
+  "org.apache.flink.api.java.functions._",
+  "org.apache.flink.api.java.operators._",
+  "org.apache.flink.api.java.sampling._",
+  "org.apache.flink.api.scala._",
+  "org.apache.flink.api.scala.utils._",
+  "org.apache.flink.streaming._",
+  "org.apache.flink.streaming.connectors.rabbitmq._",
--- End diff --

This is copied from:

https://github.com/apache/flink/commit/c82ebbfce0b11a4b4de3126fb02ccfdad80e0837
just for the streaming case. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46149356
  
--- Diff: flink-staging/flink-scala-shell/pom.xml ---
@@ -84,6 +108,11 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-connector-twitter
--- End diff --

Why are kafka, twitter and rabbitmq dependencies of the scala shell?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46149305
  
--- Diff: flink-staging/flink-scala-shell/pom.xml ---
@@ -84,6 +108,11 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-connector-twitter
+   0.10-SNAPSHOT
--- End diff --

the version must be a variable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46149442
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by nikste on 8/12/15.
+ */
+public class ScalaShellRemoteStreamEnvironment extends 
RemoteStreamEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ScalaShellRemoteStreamEnvironment.class);
+   // reference to Scala Shell, for access to virtual directory
+   private FlinkILoop flinkILoop;
+   /**
+* Creates a new RemoteStreamEnvironment that points to the master
+* (JobManager) described by the given host name and port.
+*
+* @param host   The host name or address of the master (JobManager), 
where the
+*   program should be executed.
+* @param port   The port of the master (JobManager), where the program 
should
+*   be executed.
+* @param jarFiles The JAR files with code that needs to be shipped to 
the
+*   cluster. If the program uses 
user-defined functions,
+*   user-defined input formats, or any 
libraries, those must be
+*/
+   public ScalaShellRemoteStreamEnvironment(String host, int port, 
FlinkILoop flinkILoop, String... jarFiles) {
+   super(host, port, jarFiles);
+   this.flinkILoop = flinkILoop;
+   }
+   /**
+* compiles jars from files in the shell virtual directory on the fly, 
sends and executes it in the remote stream environment
+*
+* @return Result of the computation
+* @throws ProgramInvocationException
+*/
+   @Override
+   public JobExecutionResult execute() throws Exception {
+   prepareJars();
+   return(super.execute());
+   }
+   /**
+* prepares the user generated code from the shell to be shipped to 
JobManager
+* (i.e. save it into jarFiles of this object)
+*/
+   private void prepareJars() throws MalformedURLException {
+   String jarFile = 
flinkILoop.writeFilesToDisk().getAbsolutePath();
+
+   // get "external jars, and add the shell command jar, pass to 
executor
+   List alljars = new ArrayList();
+   // get external (library) jars
+   String[] extJars = this.flinkILoop.getExternalJars();
+
+   if(!ArrayUtils.isEmpty(extJars)) {
+   alljars.addAll(Arrays.asList(extJars));
+   }
+   // add shell commands
+   alljars.add(jarFile);
+   String[] alljarsArr = new String[alljars.size()];
+   alljarsArr = alljars.toArray(alljarsArr);
+   for (String jarF : alljarsArr) {
+   URL fileUrl = new URL("file://" + jarF);
+   System.out.println("sending:" + fileUrl);
+   try {
+   JobWithJars.checkJarFile(fileUrl);
+   

[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r46149535
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
 ---
@@ -95,24 +140,49 @@ class FlinkILoop(
 new File(tmpDirBase, "scala_shell_commands.jar")
   }
 
-  private val packageImports = Seq[String](
-"org.apache.flink.core.fs._",
-"org.apache.flink.core.fs.local._",
-"org.apache.flink.api.common.io._",
-"org.apache.flink.api.common.aggregators._",
-"org.apache.flink.api.common.accumulators._",
-"org.apache.flink.api.common.distributions._",
-"org.apache.flink.api.common.operators._",
-"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
-"org.apache.flink.api.common.functions._",
-"org.apache.flink.api.java.io._",
-"org.apache.flink.api.java.aggregation._",
-"org.apache.flink.api.java.functions._",
-"org.apache.flink.api.java.operators._",
-"org.apache.flink.api.java.sampling._",
-"org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
-  )
+  private val packageImports =
+streaming match {
+  case StreamingMode.BATCH_ONLY => Seq[String](
+  "org.apache.flink.core.fs._",
+  "org.apache.flink.core.fs.local._",
+  "org.apache.flink.api.common.io._",
+  "org.apache.flink.api.common.aggregators._",
+  "org.apache.flink.api.common.accumulators._",
+  "org.apache.flink.api.common.distributions._",
+  "org.apache.flink.api.common.operators._",
+  
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+  "org.apache.flink.api.common.functions._",
+  "org.apache.flink.api.java.io._",
+  "org.apache.flink.api.java.aggregation._",
+  "org.apache.flink.api.java.functions._",
+  "org.apache.flink.api.java.operators._",
+  "org.apache.flink.api.java.sampling._",
+  "org.apache.flink.api.scala._",
+  "org.apache.flink.api.scala.utils._"
+)
+  case StreamingMode.STREAMING => Seq[String](
+   "org.apache.flink.core.fs._",
+  "org.apache.flink.core.fs.local._",
+  "org.apache.flink.api.common.io._",
+  "org.apache.flink.api.common.aggregators._",
+  "org.apache.flink.api.common.accumulators._",
+  "org.apache.flink.api.common.distributions._",
+  "org.apache.flink.api.common.operators._",
+  
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+  "org.apache.flink.api.common.functions._",
+  "org.apache.flink.api.java.io._",
+  "org.apache.flink.api.java.aggregation._",
+  "org.apache.flink.api.java.functions._",
+  "org.apache.flink.api.java.operators._",
+  "org.apache.flink.api.java.sampling._",
+  "org.apache.flink.api.scala._",
+  "org.apache.flink.api.scala.utils._",
+  "org.apache.flink.streaming._",
+  "org.apache.flink.streaming.connectors.rabbitmq._",
--- End diff --

Why this import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2015-11-30 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-160648316
  
I had some questions for the pull request.

Also, can you update the documentation according to the changes ( 
https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_shell.html) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---