[jira] [Commented] (FLINK-7851) Improve scheduling balance in case of fewer sub tasks than input operator

2018-04-26 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453599#comment-16453599
 ] 

Renkai Ge commented on FLINK-7851:
--

[~till.rohrmann] Is there an alternative way that users can improve scheduling 
balance by them selves?

> Improve scheduling balance in case of fewer sub tasks than input operator
> -
>
> Key: FLINK-7851
> URL: https://issues.apache.org/jira/browse/FLINK-7851
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> When having a job where we have a mapper {{m1}} running with dop {{n}} 
> followed by a key by and a mapper {{m2}} (all-to-all communication) which 
> runs with dop {{m}} and {{n > m}}, it happens that the sub tasks of {{m2}} 
> are not uniformly spread out across all currently used {{TaskManagers}}.
> For example: {{n = 4}}, {{m = 2}} and we have 2 TaskManagers with 2 slots 
> each. The deployment would look the following:
> TM1: 
> Slot 1: {{m1_1}} -> {{m_2_1}}
> Slot 2: {{m1_3}} -> {{m_2_2}}
> TM2:
> Slot 1: {{m1_2}}
> Slot 2: {{m1_4}}
> The problem for this behaviour is that when there are too many preferred 
> locations (currently 8) due to an all-to-all communication pattern, then we 
> will simply poll the next slot from the MultiMap in 
> {{SlotSharingGroupAssignment}}. The polling algorithm first drains all 
> available slots for a single machine before it polls slots from another 
> machine. 
> I think it would be better to poll slots in a round robin fashion wrt to the 
> machines. That way we would get a better resource utilisation by spreading 
> the tasks more evenly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2018-03-28 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417058#comment-16417058
 ] 

Renkai Ge commented on FLINK-2540:
--

[~uce] As far as I can see,there is no reason that the thread can jump out of 
the loop even though buffers be consumed by later opeartors.

> LocalBufferPool.requestBuffer gets into infinite loop
> -
>
> Key: FLINK-2540
> URL: https://issues.apache.org/jira/browse/FLINK-2540
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Gabor Gevay
>Assignee: Ufuk Celebi
>Priority: Blocker
> Fix For: 0.9.1, 0.10.0
>
>
> I'm trying to run a complicated computation that looks like this: [1].
> One of the DataSource->Filter->Map chains finishes fine, but the other one 
> freezes. Debugging shows that it is spinning in the while loop in 
> LocalBufferPool.requestBuffer.
> askToRecycle is false. Both numberOfRequestedMemorySegments and 
> currentPoolSize is 128, so it never goes into that if either.
> This is a stack trace: [2]
> And here is the code, if you would like to run it: [3]. Unfortunately, I 
> can't make it more minimal, becuase if I remove some operators, the problem 
> disappears. The class to start is malom.Solver. (On first run, it calculates 
> some lookuptables for a few minutes, and puts them into /tmp/movegen)
> [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
> [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
> [3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-2540) LocalBufferPool.requestBuffer gets into infinite loop

2018-03-28 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge reopened FLINK-2540:
--

It seems that this issue was not fixed at all.It is closed since some 
github/Jira sync bugs occured.

I still get same problem in 1.3.2

> LocalBufferPool.requestBuffer gets into infinite loop
> -
>
> Key: FLINK-2540
> URL: https://issues.apache.org/jira/browse/FLINK-2540
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Gabor Gevay
>Assignee: Ufuk Celebi
>Priority: Blocker
> Fix For: 0.9.1, 0.10.0
>
>
> I'm trying to run a complicated computation that looks like this: [1].
> One of the DataSource->Filter->Map chains finishes fine, but the other one 
> freezes. Debugging shows that it is spinning in the while loop in 
> LocalBufferPool.requestBuffer.
> askToRecycle is false. Both numberOfRequestedMemorySegments and 
> currentPoolSize is 128, so it never goes into that if either.
> This is a stack trace: [2]
> And here is the code, if you would like to run it: [3]. Unfortunately, I 
> can't make it more minimal, becuase if I remove some operators, the problem 
> disappears. The class to start is malom.Solver. (On first run, it calculates 
> some lookuptables for a few minutes, and puts them into /tmp/movegen)
> [1] http://compalg.inf.elte.hu/~ggevay/flink/plan.txt
> [2] http://compalg.inf.elte.hu/~ggevay/flink/stacktrace.txt
> [3] https://github.com/ggevay/flink/tree/deadlock-malom



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #2928: [FLINK-5108] Remove ClientShutdownHook during job ...

2016-12-15 Thread Renkai
Github user Renkai closed the pull request at:

https://github.com/apache/flink/pull/2928


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2729: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-12-14 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2729
  
Hi, @aljoscha I'm wondering about how could the warning be "big", should it 
have a different color in the console or pop out an alert window when people 
submit apps from web?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...

2016-12-13 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2928
  
That is ok to close this issue.

Max <notificati...@github.com>于2016年12月14日周三 02:31写道:

> There is one problem we overlooked. In detached mode we ensure cluster
> shutdown through a message sent by the client during job submission to 
tell
> the JobManager that this is going to be the last job it has to execute. In
> interactive execution mode, the user jar can contain multiple jobs; this 
is
> mostly useful for interactive batch jobs. Since we just execute the main
> method of the user jar, we don't know how many jobs are submitted and when
> to shutdown the cluster. That's why we chose to delegate the shutdown to
> the client for interactive jobs. Thus, I'm hesitant to remove the shutdown
> hook because it ensures that the cluster shuts down during interactive job
> executions. It prevents clusters from lingering around when the client
> shuts down.
>
> A couple of solution for this problem:
>
>1.
>
>The JobManager watches the client and shuts down a) if it looses
>connection to the client and the job it executes has completed or b) 
the
>client tells the JobManager to shut down.
>2.
>
>The JobManager drives the execution which is now part of the client
>3.
>
>We don't allow multiple jobs to execute. Then we always have a clear
>shutdown point. This is perhaps the easiest and most elegant solution. 
Most
>users only execute a single job at a time anyways. We can still allow
>interactive job executions if the user chooses to. Perhaps we can make 
this
>more explicit in the API to give a hint to the client.
>
> I'm afraid we will have to close this PR until we realize one of the above
> solutions (or another one).
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/flink/pull/2928#issuecomment-266821189>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ADOaHWUdCSBcw2gbqsaa5Vmodgp7rHNwks5rHuR2gaJpZM4LDU92>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2729: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-12-07 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2729
  
Hi, @aljoscha keep a balance in freedom and safety is really a hardwork. I 
think we could just give a warning to users, but fail and exit directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2729: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-12-07 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2729
  
Hi, @aljoscha I think the case make sense not only when parallelsim is 
1,but also users knows the code would be execute in parallel and the code is 
thread safe and they may want to reduce some resource like memory or network 
connection.However, even we totaly forbidden a scala object to extend a UDF 
function, they can still refer to a scala object in UDF which may cause 
concurrent exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2729: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-12-05 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2729
  
Hi, @StefanRRichter , I optimized the code with your advice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-12-04 Thread Renkai
Github user Renkai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r90775528
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ScalaObjectChecker.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * Scala Object checker tries to verify if a class is implemented by
+ * Scala Object
+ */
+@Internal
+public class ScalaObjectChecker {
+   public static boolean isPotentialScalaObject(Object o) {
--- End diff --

Hi, @StefanRRichter ,I'm waiting for your further advice, could you take a 
glance at this when available?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2928: [FLINK-5108] Remove ClientShutdownHook during job ...

2016-12-03 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2928

[FLINK-5108] Remove ClientShutdownHook during job execution

This patch simply removed ClientShutdownHook  related code. The changes may 
cause `org.apache.flink.yarn.YarnClusterClient#pollingRunner` be brutely stoped 
by processing exit, but it seems ok because the polling runner thread is a 
daemon thread.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-5108

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2928


commit 7504d57b2e24f70b96c0761102b689bf62653db5
Author: renkai <gaelook...@gmail.com>
Date:   2016-12-03T11:27:39Z

remove ClientShutdownHook




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-12-02 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15714849#comment-15714849
 ] 

Renkai Ge commented on FLINK-5108:
--

I think it could be ready in this weekend,sorry for the lag since I'm still
reviewing the current code.

Robert Metzger (JIRA) <j...@apache.org>于2016年12月2日周五 19:01写道:



> Remove ClientShutdownHook during job execution
> --
>
> Key: FLINK-5108
> URL: https://issues.apache.org/jira/browse/FLINK-5108
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Renkai Ge
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The behavior of the Standalone mode is to not react to client interrupts once 
> a job has been deployed. We should change the Yarn client implementation to 
> behave the same. This avoids accidental shutdown of the job, e.g. when the 
> user sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-11-29 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704795#comment-15704795
 ] 

Renkai Ge commented on FLINK-5108:
--

I want to work for this.

> Remove ClientShutdownHook during job execution
> --
>
> Key: FLINK-5108
> URL: https://issues.apache.org/jira/browse/FLINK-5108
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Renkai Ge
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The behavior of the Standalone mode is to not react to client interrupts once 
> a job has been deployed. We should change the Yarn client implementation to 
> behave the same. This avoids accidental shutdown of the job, e.g. when the 
> user sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-11-29 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge reassigned FLINK-5108:


Assignee: Renkai Ge

> Remove ClientShutdownHook during job execution
> --
>
> Key: FLINK-5108
> URL: https://issues.apache.org/jira/browse/FLINK-5108
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Renkai Ge
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The behavior of the Standalone mode is to not react to client interrupts once 
> a job has been deployed. We should change the Yarn client implementation to 
> behave the same. This avoids accidental shutdown of the job, e.g. when the 
> user sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2893: [FLINK-5128]Get Kafka partitions in FlinkKafkaProd...

2016-11-28 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2893

[FLINK-5128]Get Kafka partitions in FlinkKafkaProducer only if a 
partitioner is set



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-5128

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2893


commit 7c78da6f1edca675ceced255eff2fa73044edef1
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-29T06:26:00Z

Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5128) Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set

2016-11-24 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692944#comment-15692944
 ] 

Renkai Ge commented on FLINK-5128:
--

I want to work for this.

> Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set
> ---
>
> Key: FLINK-5128
> URL: https://issues.apache.org/jira/browse/FLINK-5128
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Minor
>
> The fetched partitions list is only used when calling {{open(...)}} for a 
> user supplied custom partitioner in {{FlinkKafkaProducer}}.
> Therefore, we can actually only fetch the partition list if the user used a 
> partitioner (right now we always do the partition fetching).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5128) Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set

2016-11-24 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge reassigned FLINK-5128:


Assignee: Renkai Ge

> Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set
> ---
>
> Key: FLINK-5128
> URL: https://issues.apache.org/jira/browse/FLINK-5128
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Renkai Ge
>Priority: Minor
>
> The fetched partitions list is only used when calling {{open(...)}} for a 
> user supplied custom partitioner in {{FlinkKafkaProducer}}.
> Therefore, we can actually only fetch the partition list if the user used a 
> partitioner (right now we always do the partition fetching).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2847: [FLINK-5031]Consecutive DataStream.split() ignored

2016-11-21 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2847

[FLINK-5031]Consecutive DataStream.split() ignored

I think this is a way to solve this issue, but might not be the best 
one.Since I'm knowing the code base enough, I hope someone may review it and 
give some advice.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-5031

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2847.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2847


commit e216c79cd99c08d92847a6254c13fc8d75bb94c3
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T06:38:03Z

add a new StreamNode when split

commit 8ce23e658102c35c58946429fd0fde5e72d722df
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:00:04Z

fix test case

commit 6672354819f2a6411f1a4c9479653f421b6163c6
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:09:01Z

delete output selectors check since those infos are moved to split1

commit d59900a19c0d966ee175fbbe9661004cec556670
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:44:58Z

add unit test for consecutive split

commit 8198ee28e95371651d8c159db36dfcf6163c1659
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T02:22:16Z

add unit test for consecutive split

commit 2a0d0ec730c1b7d5c955a4ef374ebd7fd6c58f5c
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:14:37Z

add unit test for consecutive split

commit 410b8717b76df8bfa40b59f1593439442d34ec49
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:26:18Z

add unit test for consecutive split

commit 290f2f6f6fc1a249cf30d04cc31429e95dc0ec44
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:50:13Z

add unit test for consecutive split




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15678716#comment-15678716
 ] 

Renkai Ge edited comment on FLINK-5031 at 11/19/16 6:29 AM:


[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be 
{code}{3,4,5,6,7,8,9,10,11}{code},that was {code} 
union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.


was (Author: renkaige):
[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be {3,4,5,6,7,8,9,10,11},that was 
{code} union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>    Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction<Long, Long>() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15678716#comment-15678716
 ] 

Renkai Ge commented on FLINK-5031:
--

[~fhueske]The second split was not ignored, it was unioned by the first 
one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select 
change to "GreaterEqual", the result would be {3,4,5,6,7,8,9,10,11},that was 
{code} union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see 
https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114.
In the current implementation of split, you will get a unioned result of all 
split combination, I think it  was strange somehow.We might solve this 
issue by reimplement the split function by an OneInputTransformation.

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction<Long, Long>() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-11-17 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15675265#comment-15675265
 ] 

Renkai Ge commented on FLINK-4587:
--

Yes,solved in the way described

Stephan Ewen (JIRA) <j...@apache.org>于2016年11月18日周五 00:15写道:



> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: diff in mvn clean package.png, flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpR

[jira] [Commented] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-14 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15666015#comment-15666015
 ] 

Renkai Ge commented on FLINK-5031:
--

I want to work on this issue

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction<Long, Long>() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-14 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge reassigned FLINK-5031:


Assignee: Renkai Ge

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction<Long, Long>() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2729: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-11-14 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2729
  
I'm not very understand why some function need to be cleaned while others 
are not in the last version of scala API, but I think all of them need to be 
checked, so I add some checks where has no cleans.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-11-14 Thread Renkai
Github user Renkai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87792198
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ScalaObjectChecker.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * Scala Object checker tries to verify if a class is implemented by
+ * Scala Object
+ */
+@Internal
+public class ScalaObjectChecker {
+   public static boolean isPotentialScalaObject(Object o) {
--- End diff --

The function you advised has a shortcoming, all functions in the call stack 
need to have this `(implicit ev: A <:< Singleton = null)` implicit parameter to 
make it work(see 
https://gist.github.com/Renkai/ddd59337d3c302eb3d061ff8d249413c),So that too 
many function signatures need to be changed, and the engineering complexity 
would be increased a lot. I'm afraid we have to keep the current version of 
ScalaObjectChecker until we found better solutions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-11-13 Thread Renkai
Github user Renkai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87743796
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ScalaObjectChecker.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * Scala Object checker tries to verify if a class is implemented by
+ * Scala Object
+ */
+@Internal
+public class ScalaObjectChecker {
+   public static boolean isPotentialScalaObject(Object o) {
--- End diff --

Thanks!It's quite helpful,could you tell be where do you find this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-11-10 Thread Renkai
Github user Renkai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87350332
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -181,7 +181,17 @@ protected void fillInType(TypeInformation typeInfo) 
{
return this.type;
}
 
+   /**
+ *  1. Check if the function is implemented by a scala object. Checks 
only if scala object function forbidden
+ *is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ *
+ *  2. Returns a "closure-cleaned" version of the given function. 
Cleans only if closure cleaning
+ * is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ */
public  F clean(F f) {
--- End diff --

Thanks for review,I tried to add a new API to DataSet, but I got a compile 
error, I think some maven plugin prevent me to change the API, could you tell 
me how to be able to pass the compile and API check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-10-31 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2729

[FLINK-4883]Prevent UDFs implementations through Scala singleton objects

The code changes should be work, but I was quite a lot confused for these 
reasons

1. The Scala wrapper for DataStream and DataSet do  almost the same 
thing, but they have a lot of differences in code style,name rule and code 
structures.
2. Some of the DataSet operators are surrounded by clean function, 
while some else are not, I think they might they might be forgotten to add.
3. Scala and Java have different version of  ClosureCleaner,I think one 
ClosureCleaner should be enough, or the Java version of ClosureCleaner be a 
basic version, Scala version can call the Java version directly and add some 
additional logics. 

I think the Scala wrapper of DataSet could be rewrite to make it harmonic 
with the wrapper of DataStream, maybe we can create another issue ticket to do 
it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-4883

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2729


commit 508a026bb5bf56befb778ce625f6872f70753a87
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T14:28:41Z

Prevent UDFs implementations through Scala singleton objects

commit 6d57537a4c462f0d21a0aaff143f7fe5ef16b064
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T14:41:30Z

Prevent UDFs implementations through Scala singleton objects

commit 0ddc539fb8608ad840a6914b8c73300994e4
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T15:02:33Z

revert method name since

[ERROR] Failed to execute goal 
com.github.siom79.japicmp:japicmp-maven-plugin:0.7.0:cmp (default) on project 
flink-java: Breaking the build because there is at least one binary 
incompatible class: org.apache.flink.api.java.DataSet

commit 9895d6ef27754a1d3b667b17cce5aeb6d30081d1
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T15:33:00Z

make function serializable

commit bb40ac78d9833d0607dc4e8ded922b7e567cb68b
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T15:57:53Z

solve
error 
file=/home/travis/build/Renkai/flink/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/ScalaObjectCheckerTest.scala
 message=Header does not match expected text line=10

commit c75bcb60a2edabd1fd6cfd2270218687c3c747da
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-31T00:35:27Z

solve
error 
file=/home/travis/build/Renkai/flink/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaObjectCheckStreamTest.scala
 message=Header does not match expected text line=10




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-30 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15619663#comment-15619663
 ] 

Renkai Ge commented on FLINK-4883:
--

I tried to implement it in module flink-scala and flink-streaming-scala, but it 
seems that these two modules assumes flink-java and flink-streaming-java do a 
lot of work and flink-scala and flink-stream-scala is just a wrap. I think I 
have to change the code in flink-java and flink-stream-java, and write tests in 
 flink-scala and flink-stream-scala or too much code would be changed.

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>    Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2726: [FLINK-4883]Prevent UDFs implementations through S...

2016-10-30 Thread Renkai
Github user Renkai closed the pull request at:

https://github.com/apache/flink/pull/2726


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2726: [FLINK-4883]Prevent UDFs implementations through Scala si...

2016-10-30 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2726
  
Not prepared yet


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2726: Flink 4883

2016-10-29 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2726

Flink 4883

Ideas and code structure are basically copied from 
`org.apache.flink.api.java.ClosureCleaner`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-4883

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2726.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2726


commit c81de980f3ef5eb98fe235e359e52b3a13af6108
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T04:40:31Z

add check scala object function logic

commit d159d7fc1cf286035bb096bb9d2922476f4b21a4
Author: renkai <gaelook...@gmail.com>
Date:   2016-10-30T05:08:40Z

add check scala object function logic
---
add test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-27 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611756#comment-15611756
 ] 

Renkai Ge commented on FLINK-4883:
--

How about to extend the method 
org.apache.flink.streaming.api.datastream.DataStream#clean to make it not only 
clean the function but also check if the function is a scala object, I think 
it's the way to implement the detection with minimal changes.

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>    Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-26 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607635#comment-15607635
 ] 

Renkai Ge commented on FLINK-4883:
--

I found a way to distinguish normal scala instances and objects, that is the 
class name of scala objects would be suffixed by a `$` symbol.Though Java users 
can name a class suffixed by a `$`, too, it is discouraged and seldom people 
will do this.

I think we can scan every function in the generated execution plan before it 
send to the cluster, if any function have a class name suffixed by `$`, we can 
throw exception and tell them not to use scala object to extend functions.

The risk is, it seems distinguish object and normal class by class name is a 
hacking but not scala standard, furture versions of scala may have a different 
way to implement object then this method lost effectiveness.

What's your opinion? [~srichter]

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>    Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-24 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15604147#comment-15604147
 ] 

Renkai Ge commented on FLINK-4883:
--

I tried to use object to extend UDF, but I haven't got a failure, could you 
give me some example code that will cause failure?

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>    Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-23 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15600886#comment-15600886
 ] 

Renkai Ge commented on FLINK-4883:
--

I want to work on this issue.

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1605) Create a shaded Hadoop fat jar to resolve library version conflicts

2016-09-21 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509539#comment-15509539
 ] 

Renkai Ge commented on FLINK-1605:
--

Hi [~rmetzger],
Could you help take a view at this issue 
https://issues.apache.org/jira/browse/FLINK-4587?filter=-2
and this problem?
http://stackoverflow.com/questions/39610268/different-result-of-mvn-clean-package-when-execute-in-different-directory
Thanks a lot!

> Create a shaded Hadoop fat jar to resolve library version conflicts
> ---
>
> Key: FLINK-1605
> URL: https://issues.apache.org/jira/browse/FLINK-1605
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.9
>
>
> As per mailing list discussion: 
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-shaded-Hadoop-fat-jar-to-resolve-library-version-conflicts-td3881.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-19 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4587:
-
Component/s: Build System

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: diff in mvn clean package.png, flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:47)
>   at 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectio

[jira] [Comment Edited] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-19 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15502441#comment-15502441
 ] 

Renkai Ge edited comment on FLINK-4587 at 9/19/16 9:18 AM:
---

logs when execute mvn between root directory and flink-dist directory

see the png in attachments


was (Author: renkaige):
logs when execute mvn between root directory and flink-dist directory

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: diff in mvn clean package.png, flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpReq

[jira] [Commented] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-19 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15502715#comment-15502715
 ] 

Renkai Ge commented on FLINK-4587:
--

The released binary at https://flink.apache.org/downloads.html#source is ok but 
I got the same problem when build from the release source 1.1.2. Are you really 
use the command {code}mvn clean package -DskipTests{code} to generate release 
binaries or you have some other scripts?

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: diff in mvn clean package.png, flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpReq

[jira] [Updated] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-19 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4587:
-
Attachment: diff in mvn clean package.png

logs when execute mvn between root directory and flink-dist directory

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: diff in mvn clean package.png, flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequest

[jira] [Commented] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15502373#comment-15502373
 ] 

Renkai Ge commented on FLINK-4587:
--

When I execute {code}mvn clean install -DskipTests{code} in the root directory 
of source, the jar have both original and shaded version of classes, when I 
execute the same command in {code}flink-dist{code} directory, the jar have only 
shaded ones.

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>

[jira] [Commented] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15502120#comment-15502120
 ] 

Renkai Ge commented on FLINK-4587:
--

the
{code}lib/flink-dist_2.10-1.2-SNAPSHOT.jar{code}
have both
{code}org/apache/flink/hadoop/shaded/org/apache/http/message/BasicLineFormatter.class{code}
and
{code}org/apache/http/message/BasicLineFormatter.class{code}

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>   

[jira] [Commented] (FLINK-3373) Using a newer library of Apache HttpClient than 4.2.6 will get class loading problems

2016-09-18 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15500799#comment-15500799
 ] 

Renkai Ge commented on FLINK-3373:
--

[~StephanEwen]
This issue seems reproduced in Flink 1.2.0

https://issues.apache.org/jira/browse/FLINK-4587?filter=-2

> Using a newer library of Apache HttpClient than 4.2.6 will get class loading 
> problems
> -
>
> Key: FLINK-3373
> URL: https://issues.apache.org/jira/browse/FLINK-3373
> Project: Flink
>  Issue Type: Bug
> Environment: Latest Flink snapshot 1.0
>Reporter: Jakob Sultan Ericsson
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> When I trying to use Apache HTTP client 4.5.1 in my flink job it will crash 
> with NoClassDefFound.
> This has to do that it load some classes from provided httpclient 4.2.5/6 in 
> core flink.
> {noformat}
> 17:05:56,193 INFO  org.apache.flink.runtime.taskmanager.Task  
>- DuplicateFilter -> InstallKeyLookup (11/16) switched to FAILED with 
> exception.
> java.lang.NoSuchFieldError: INSTANCE
> at 
> org.apache.http.conn.ssl.SSLConnectionSocketFactory.(SSLConnectionSocketFactory.java:144)
> at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.getDefaultRegistry(PoolingHttpClientConnectionManager.java:109)
> at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.(PoolingHttpClientConnectionManager.java:116)
> ...
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> SSLConnectionSocketFactory and finds an earlier version of the 
> AllowAllHostnameVerifier that does have the INSTANCE variable (instance 
> variable was probably added in 4.3).
> {noformat}
> jar tvf lib/flink-dist-1.0-SNAPSHOT.jar |grep AllowAllHostnameVerifier  
>791 Thu Dec 17 09:55:46 CET 2015 
> org/apache/http/conn/ssl/AllowAllHostnameVerifier.class
> {noformat}
> Solutions would be:
> - Fix the classloader so that my custom job does not conflict with internal 
> flink-core classes... pretty hard
> - Remove the dependency somehow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-06 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4587:
-
Attachment: flink-explore-src.zip

> Yet another java.lang.NoSuchFieldError: INSTANCE
> 
>
> Key: FLINK-4587
> URL: https://issues.apache.org/jira/browse/FLINK-4587
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
> Environment: Latest SNAPSHOT
>Reporter: Renkai Ge
> Attachments: flink-explore-src.zip
>
>
> For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
> in flink.
> The source file is:
> {code}
> import org.apache.flink.streaming.api.scala._
> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
> /**
>   * Created by renkai on 16/9/7.
>   */
> object Main {
>   def main(args: Array[String]): Unit = {
> val instance = ManagedNHttpClientConnectionFactory.INSTANCE
> println("instance = " + instance)
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(1 to 100)
> val result = stream.map { x =>
>   x * 2
> }
> result.print()
> env.execute("xixi")
>   }
> }
> {code}
> and 
> {code}
> name := "flink-explore"
> version := "1.0"
> scalaVersion := "2.11.8"
> crossPaths := false
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
> )
> {code}
> I use `sbt assembly` to get a fat jar.
> If I run the command
> {code}
>  java -cp flink-explore-assembly-1.0.jar Main
> {code}
> I get the result 
> {code}
> instance = 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.api.scala.ClosureCleaner$).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
> 09/07/2016 12:05:26   Job execution switched to status RUNNING.
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to SCHEDULED
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to DEPLOYING
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(20/24) switched to RUNNING
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(19/24) switched to RUNNING
> 15> 30
> 20> 184
> ...
> 19> 182
> 1> 194
> 8> 160
> 09/07/2016 12:05:26   Source: Collection Source(1/1) switched to FINISHED
> ...
> 09/07/2016 12:05:26   Map -> Sink: Unnamed(1/24) switched to FINISHED
> 09/07/2016 12:05:26   Job execution switched to status FINISHED.
> {code}
> Nothing special.
> But if I run the jar by
> {code}
> ./bin/flink run shop-monitor-flink-assembly-1.0.jar
> {code}
> I will get an error
> {code}
> $ ./bin/flink run flink-explore-assembly-1.0.jar
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> 
>  The program finished with the following exception:
> java.lang.NoSuchFieldError: INSTANCE
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
>   at 
> org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:47)
>   at 
> org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:75)
&

[jira] [Created] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-06 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4587:


 Summary: Yet another java.lang.NoSuchFieldError: INSTANCE
 Key: FLINK-4587
 URL: https://issues.apache.org/jira/browse/FLINK-4587
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.2.0
 Environment: Latest SNAPSHOT
Reporter: Renkai Ge


For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
in flink.
The source file is:
{code}
import org.apache.flink.streaming.api.scala._
import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory

/**
  * Created by renkai on 16/9/7.
  */
object Main {
  def main(args: Array[String]): Unit = {
val instance = ManagedNHttpClientConnectionFactory.INSTANCE
println("instance = " + instance)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(1 to 100)
val result = stream.map { x =>
  x * 2
}
result.print()
env.execute("xixi")
  }
}

{code}

and 
{code}
name := "flink-explore"

version := "1.0"

scalaVersion := "2.11.8"

crossPaths := false

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
)
{code}
I use `sbt assembly` to get a fat jar.

If I run the command
{code}
 java -cp flink-explore-assembly-1.0.jar Main
{code}
I get the result 

{code}
instance = 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
log4j:WARN No appenders could be found for logger 
(org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
09/07/2016 12:05:26 Job execution switched to status RUNNING.
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to SCHEDULED
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to DEPLOYING
...
09/07/2016 12:05:26 Map -> Sink: Unnamed(20/24) switched to RUNNING
09/07/2016 12:05:26 Map -> Sink: Unnamed(19/24) switched to RUNNING
15> 30
20> 184
...
19> 182
1> 194
8> 160
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to FINISHED
...
09/07/2016 12:05:26 Map -> Sink: Unnamed(1/24) switched to FINISHED
09/07/2016 12:05:26 Job execution switched to status FINISHED.
{code}

Nothing special.

But if I run the jar by
{code}
./bin/flink run shop-monitor-flink-assembly-1.0.jar
{code}

I will get an error

{code}
$ ./bin/flink run flink-explore-assembly-1.0.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program


 The program finished with the following exception:

java.lang.NoSuchFieldError: INSTANCE
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:47)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:75)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:83)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:64)
at Main$.main(Main.scala:9)
at Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(Packaged

[jira] [Commented] (FLINK-4532) Allow independent metrics reporter for tasks

2016-08-31 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15452069#comment-15452069
 ] 

Renkai Ge commented on FLINK-4532:
--

[~StephanEwen] So should I close this issue and create a new one for (1) or 
resolve this issue by implement (1) and wait for others to complete (2)?

> Allow independent metrics reporter for tasks
> 
>
> Key: FLINK-4532
> URL: https://issues.apache.org/jira/browse/FLINK-4532
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>    Reporter: Renkai Ge
>
> Currently,  JobManager, TaskManager and Task share the same configuration of 
> reporters, and implementations of 
> org.apache.flink.metrics.reporter.MetricReporter  should be in classpath 
> before JobManager started. 
> In a cluster with multi tenancy, cluster administrator may want to monitor 
> the cluster metrics in one place, and the owner of a application want to view 
> the metrics of their application in somewhere else, and they may put custom 
> MetricReporter in the application jar if the classpath don't provide one.
> I want to implement this by:
>   1. Make it possible to load MetricReporter from userCodeClassLoader.
>   2. Put new API into ExecutionConfig like {code}public long 
> enableAdditionalMetricReporters(List classNames,boolean 
> disableGlobalMetricReporters);{code}
> I will begin to work on this if you guys agrees.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4532) Allow independent metrics reporter for tasks

2016-08-30 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15450946#comment-15450946
 ] 

Renkai Ge edited comment on FLINK-4532 at 8/31/16 3:33 AM:
---

[~StephanEwen]
Yes, I think your proposal is actually a good way.To achieve this,we need
1. Add a command line option that users can give an additional flink-conf.yaml, 
the key-value pairs in the additional file may overwrite those in 
FLINK_HOME/conf/flink-conf.yaml.
2. Currently the command line client have two way to add user class,one is 
\-\-classpath option, but this option need us to put user jars on every machine 
of a YARN cluster or have a external http server before the JobManager started, 
not very friendly to use.One is the jar file for user task, but classes in this 
jar file can only be loaded by user task, not the JobManager or TaskManager. 
Should we make the usercodeJar loadable by JobManager and TaskManager or give 
an enhanced version of \-\-classpath option that automatically transfer jars in 
every YARN/Mesos/etc node and put them in classpath before 
JobManager/TaskManager started?


was (Author: renkaige):
[~StephanEwen]
Yes, I think your proposal is actually a good way.To achieve this,we need
1. Add a command line option that users can give an additional flink-conf.yaml, 
the key-value pairs in the additional file may overwrite those in 
FLINK_HOME/conf/flink-conf.yaml.
2. Currently the command line client have two way to add user class,one is 
'--classpath' option, but this option need us to put user jars on every machine 
of a YARN cluster or have a external http server before the JobManager started, 
not very friendly to use.One is the jar file for user task, but classes in this 
jar file can only be loaded by user task, not the JobManager or TaskManager. 
Should we make the usercodeJar loadable by JobManager and TaskManager or give 
an enhanced version of '--classpath' option that automatically transfer jars in 
every YARN/Mesos/etc node and put them in classpath before 
JobManager/TaskManager started?

> Allow independent metrics reporter for tasks
> 
>
> Key: FLINK-4532
> URL: https://issues.apache.org/jira/browse/FLINK-4532
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>    Reporter: Renkai Ge
>
> Currently,  JobManager, TaskManager and Task share the same configuration of 
> reporters, and implementations of 
> org.apache.flink.metrics.reporter.MetricReporter  should be in classpath 
> before JobManager started. 
> In a cluster with multi tenancy, cluster administrator may want to monitor 
> the cluster metrics in one place, and the owner of a application want to view 
> the metrics of their application in somewhere else, and they may put custom 
> MetricReporter in the application jar if the classpath don't provide one.
> I want to implement this by:
>   1. Make it possible to load MetricReporter from userCodeClassLoader.
>   2. Put new API into ExecutionConfig like {code}public long 
> enableAdditionalMetricReporters(List classNames,boolean 
> disableGlobalMetricReporters);{code}
> I will begin to work on this if you guys agrees.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4532) Allow independent metrics reporter for tasks

2016-08-30 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15450946#comment-15450946
 ] 

Renkai Ge commented on FLINK-4532:
--

[~StephanEwen]
Yes, I think your proposal is actually a good way.To achieve this,we need
1. Add a command line option that users can give an additional flink-conf.yaml, 
the key-value pairs in the additional file may overwrite those in 
FLINK_HOME/conf/flink-conf.yaml.
2. Currently the command line client have two way to add user class,one is 
'--classpath' option, but this option need us to put user jars on every machine 
of a YARN cluster or have a external http server before the JobManager started, 
not very friendly to use.One is the jar file for user task, but classes in this 
jar file can only be loaded by user task, not the JobManager or TaskManager. 
Should we make the usercodeJar loadable by JobManager and TaskManager or give 
an enhanced version of '--classpath' option that automatically transfer jars in 
every YARN/Mesos/etc node and put them in classpath before 
JobManager/TaskManager started?

> Allow independent metrics reporter for tasks
> 
>
> Key: FLINK-4532
> URL: https://issues.apache.org/jira/browse/FLINK-4532
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>    Reporter: Renkai Ge
>
> Currently,  JobManager, TaskManager and Task share the same configuration of 
> reporters, and implementations of 
> org.apache.flink.metrics.reporter.MetricReporter  should be in classpath 
> before JobManager started. 
> In a cluster with multi tenancy, cluster administrator may want to monitor 
> the cluster metrics in one place, and the owner of a application want to view 
> the metrics of their application in somewhere else, and they may put custom 
> MetricReporter in the application jar if the classpath don't provide one.
> I want to implement this by:
>   1. Make it possible to load MetricReporter from userCodeClassLoader.
>   2. Put new API into ExecutionConfig like {code}public long 
> enableAdditionalMetricReporters(List classNames,boolean 
> disableGlobalMetricReporters);{code}
> I will begin to work on this if you guys agrees.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4532) Allow independent metrics reporter for tasks

2016-08-30 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4532:


 Summary: Allow independent metrics reporter for tasks
 Key: FLINK-4532
 URL: https://issues.apache.org/jira/browse/FLINK-4532
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: Renkai Ge


Currently,  JobManager, TaskManager and Task share the same configuration of 
reporters, and implementations of 
org.apache.flink.metrics.reporter.MetricReporter  should be in classpath before 
JobManager started. 
In a cluster with multi tenancy, cluster administrator may want to monitor the 
cluster metrics in one place, and the owner of a application want to view the 
metrics of their application in somewhere else, and they may put custom 
MetricReporter in the application jar if the classpath don't provide one.

I want to implement this by:
1. Make it possible to load MetricReporter from userCodeClassLoader.
2. Put new API into ExecutionConfig like {code}public long 
enableAdditionalMetricReporters(List classNames,boolean 
disableGlobalMetricReporters);{code}

I will begin to work on this if you guys agrees.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2118) Table API fails on composite filter conditions

2016-08-22 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430310#comment-15430310
 ] 

Renkai Ge edited comment on FLINK-2118 at 8/22/16 8:40 AM:
---

[~fhueske][~aljoscha]

operations end with '=' have special attribute that will re-interpretation the 
expressions
http://scala-lang.org/files/archive/spec/2.11/06-expressions.html#assignment-operators

I think to solve this problem, we may need to replace the operator '===' 
by{code} '!!!' or '=!='{code} or something like these.


was (Author: renkaige):
[~fhueske][~aljoscha]

operations end with '=' have special attribute that will re-interpretation the 
expressions
http://scala-lang.org/files/archive/spec/2.11/06-expressions.html#assignment-operators

I think to solve this problem, we may need to replace the operator '===' by 
'!!!' or '=!=' or something like these.

> Table API fails on composite filter conditions
> --
>
> Key: FLINK-2118
> URL: https://issues.apache.org/jira/browse/FLINK-2118
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>
> Having a composite filter conditions such as 
> {code}
> myTable.filter('name !== "Pete" && 'name !== "Bob")
> {code}
> fails with the following error message:
> {code}
> ExpressionException: Non-boolean operand types String and String in Pete && 
> 'name
> {code}
> whereas 
> {code}
> myTable.filter( ('name !== "Pete") && ('name !== "Bob") )
> {code}
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2118) Table API fails on composite filter conditions

2016-08-22 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430310#comment-15430310
 ] 

Renkai Ge commented on FLINK-2118:
--

[~fhueske][~aljoscha]

operations end with '=' have special attribute that will re-interpretation the 
expressions
http://scala-lang.org/files/archive/spec/2.11/06-expressions.html#assignment-operators

I think to solve this problem, we may need to replace the operator '===' by 
'!!!' or '=!=' or something like these.

> Table API fails on composite filter conditions
> --
>
> Key: FLINK-2118
> URL: https://issues.apache.org/jira/browse/FLINK-2118
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>
> Having a composite filter conditions such as 
> {code}
> myTable.filter('name !== "Pete" && 'name !== "Bob")
> {code}
> fails with the following error message:
> {code}
> ExpressionException: Non-boolean operand types String and String in Pete && 
> 'name
> {code}
> whereas 
> {code}
> myTable.filter( ('name !== "Pete") && ('name !== "Bob") )
> {code}
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2118) Table API fails on composite filter conditions

2016-08-19 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427903#comment-15427903
 ] 

Renkai Ge commented on FLINK-2118:
--

I want to help solving this issue but I'm not sure whether I have the ability I 
will try these days.

> Table API fails on composite filter conditions
> --
>
> Key: FLINK-2118
> URL: https://issues.apache.org/jira/browse/FLINK-2118
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>
> Having a composite filter conditions such as 
> {code}
> myTable.filter('name !== "Pete" && 'name !== "Bob")
> {code}
> fails with the following error message:
> {code}
> ExpressionException: Non-boolean operand types String and String in Pete && 
> 'name
> {code}
> whereas 
> {code}
> myTable.filter( ('name !== "Pete") && ('name !== "Bob") )
> {code}
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2118) Table API fails on composite filter conditions

2016-08-19 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427893#comment-15427893
 ] 

Renkai Ge commented on FLINK-2118:
--

[~sahitya.pavurala] ! have a higher precedence than && by scala specific 
http://scala-lang.org/files/archive/spec/2.11/06-expressions.html#infix-operations


> Table API fails on composite filter conditions
> --
>
> Key: FLINK-2118
> URL: https://issues.apache.org/jira/browse/FLINK-2118
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>
> Having a composite filter conditions such as 
> {code}
> myTable.filter('name !== "Pete" && 'name !== "Bob")
> {code}
> fails with the following error message:
> {code}
> ExpressionException: Non-boolean operand types String and String in Pete && 
> 'name
> {code}
> whereas 
> {code}
> myTable.filter( ('name !== "Pete") && ('name !== "Bob") )
> {code}
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2355: [FLINK-4282]Add Offset Parameter to WindowAssigners

2016-08-19 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2355
  
@aljoscha It's fine.I'll try to write well formatted code manually from now 
on, and I hope the code in the last commit is well formatted enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4428) Method map/flatMapWithState may need a eviction policy

2016-08-19 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4428:


 Summary: Method map/flatMapWithState may need a eviction policy
 Key: FLINK-4428
 URL: https://issues.apache.org/jira/browse/FLINK-4428
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Affects Versions: 1.1.2
Reporter: Renkai Ge


I want to count the number of unique visitors of a website every day.
 If the number changes, I want get the newest number in 1 second, and
 it should keep silence if the number doesn't change.I implemented this 
 by time window of 1 day,trigger of 1 second and flatMapWithState to 
 filter duplicated results. 
{code}
 //case class Visit(uuid: String, time: Long, platform: Int)
 
 //case class WindowUv(platform: Int, uv: Long, windowStart: Long, 
WindowEnd: Long)
 
 //  val consumer: FlinkKafkaConsumer08[Visit]
 val stream =
 env.addSource(consumer)
   .keyBy(_.platform)
   .timeWindow(Time.days(1))
   .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
   .applyWith((0, Set.empty[Int], 0l, 0l))(
 foldFunction = {
   case ((_, set, _, 0), visit) =>
 (visit.platform, set + visit.uuid.hashCode, 0, 0)
 },
 windowFunction = {
   case (key, window, results) =>
 results.map {
   case (platform, set, _, _) =>
 (platform, set, window.getStart, window.getEnd)
 }
 }
   )
   .mapWith {
 case (key, set, windowStart, windowEnd) =>
   WindowUv(key, set.size, windowStart, windowEnd)
   }
   .keyBy(uv => (uv.platform, uv.windowStart))
   .flatMapWithState[WindowUv, Int] {
   case ((key, num, begin, end), curr) =>
 curr match {
   case Some(numCurr) if numCurr == num =>
 (Seq.empty, Some(num))
   case _ =>
 (Seq(WindowUv(key, num, begin, end)), Some(num))
 }
 }
 stream.print()
 env.execute("Boom")
{code}

There is a problem that I used flatMapWithState,the state of one day will
be never updated and never used after the day passed, but it will stay
in the memory forever, there is no way to evict it. So I think the status
in map may need some eviction policy related with time or global conditions
rather than only with the last message of the key(It's hard to tell whether 
a message is the last when the last is coming).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2355: [FLINK-4282]Add Offset Parameter to WindowAssigners

2016-08-18 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2355
  
I have rearranged the code. It is really hard for me to add space after 
every comma,do you have any code style preset for IDEs so that I can reformat 
my new code automatically without change those of others?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-17 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425768#comment-15425768
 ] 

Renkai Ge commented on FLINK-4282:
--

Hi, [~aljoscha]. I changed the code as your last proposal, do you have any 
further idea with this issue?

> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4409) class conflict between jsr305-1.3.9.jar and flink-shaded-hadoop2-1.1.1.jar

2016-08-17 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4409:
-
Description: 
It seems all classes in jsr305-1.3.9.jar can be found in 
flink-shaded-hadoop2-1.1.1.jar,too.
I can exclude these jars for a success assembly and run when I was using sbt
{code:none}
libraryDependencies ++= Seq(
  "com.typesafe.play" %% "play-json" % "2.3.8",
  "org.apache.flink" %% "flink-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "joda-time" % "joda-time" % "2.9.4",
  "org.scalikejdbc" %% "scalikejdbc" % "2.2.7",
  "mysql" % "mysql-connector-java" % "5.1.15",
  "io.spray" %% "spray-caching" % "1.3.3"
)
{code}
But I think it might be better to remove jsr305 dependency from Flink.

  was:
It seems all classes in jsr305-1.3.9.jar can be found in 
flink-shaded-hadoop2-1.1.1.jar,too.
I can exclude these jars for a success assembly and run when I was using sbt
{code:scala}
libraryDependencies ++= Seq(
  "com.typesafe.play" %% "play-json" % "2.3.8",
  "org.apache.flink" %% "flink-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "joda-time" % "joda-time" % "2.9.4",
  "org.scalikejdbc" %% "scalikejdbc" % "2.2.7",
  "mysql" % "mysql-connector-java" % "5.1.15",
  "io.spray" %% "spray-caching" % "1.3.3"
)
{code}
But I think it might be better to remove jsr305 dependency from Flink.


> class conflict between jsr305-1.3.9.jar and flink-shaded-hadoop2-1.1.1.jar
> --
>
> Key: FLINK-4409
> URL: https://issues.apache.org/jira/browse/FLINK-4409
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.0
>Reporter: Renkai Ge
>Priority: Minor
>
> It seems all classes in jsr305-1.3.9.jar can be found in 
> flink-shaded-hadoop2-1.1.1.jar,too.
> I can exclude these jars for a success assembly and run when I was using sbt
> {code:none}
> libraryDependencies ++= Seq(
>   "com.typesafe.play" %% "play-json" % "2.3.8",
>   "org.apache.flink" %% "flink-scala" % "1.1.1"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.1.1"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-streaming-scala" % "1.1.1"
> exclude("com.google.code.findbugs", "jsr305"),
>   "org.apache.flink" %% "flink-clients" % "1.1.1"
> exclude("com.google.code.findbugs", "jsr305"),
>   "joda-time" % "joda-time" % "2.9.4",
>   "org.scalikejdbc" %% "scalikejdbc" % "2.2.7",
>   "mysql" % "mysql-connector-java" % "5.1.15",
>   "io.spray" %% "spray-caching" % "1.3.3"
> )
> {code}
> But I think it might be better to remove jsr305 dependency from Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4409) class conflict between jsr305-1.3.9.jar and flink-shaded-hadoop2-1.1.1.jar

2016-08-17 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4409:


 Summary: class conflict between jsr305-1.3.9.jar and 
flink-shaded-hadoop2-1.1.1.jar
 Key: FLINK-4409
 URL: https://issues.apache.org/jira/browse/FLINK-4409
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.1.0
Reporter: Renkai Ge
Priority: Minor


It seems all classes in jsr305-1.3.9.jar can be found in 
flink-shaded-hadoop2-1.1.1.jar,too.
I can exclude these jars for a success assembly and run when I was using sbt
{code:scala}
libraryDependencies ++= Seq(
  "com.typesafe.play" %% "play-json" % "2.3.8",
  "org.apache.flink" %% "flink-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.1.1"
exclude("com.google.code.findbugs", "jsr305"),
  "joda-time" % "joda-time" % "2.9.4",
  "org.scalikejdbc" %% "scalikejdbc" % "2.2.7",
  "mysql" % "mysql-connector-java" % "5.1.15",
  "io.spray" %% "spray-caching" % "1.3.3"
)
{code}
But I think it might be better to remove jsr305 dependency from Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-13 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15420137#comment-15420137
 ] 

Renkai Ge commented on FLINK-4282:
--

[~aljoscha]I use two mod just want to prevent someone may abuse this method,if 
we presume people use the method with goodwill and just solve the problem the 
really have,I think it's okey to just use one mod.

> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-12 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419763#comment-15419763
 ] 

Renkai Ge commented on FLINK-4282:
--

Thanks,[~aljoscha].It's a brilliant idea,but the result might be larger than 
timestamp if offset is too large,so I think we might to mod twice in this 
method.
{code:java}
public static long getWindowStartWithOffset(long timestamp, long 
offset, long windowSize) {
return timestamp - ((timestamp - offset) % windowSize + 
windowSize) % windowSize;
}
{code}

> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2355: [FLINK-4282]Add Offset Parameter to WindowAssigner...

2016-08-11 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2355

[FLINK-4282]Add Offset Parameter to WindowAssigners

Although there is already a merge request for this issue,I think this 
implementation is more sensible.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-4282

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2355


commit d31cc8125b30212b6ac21996a48d703eb11354e9
Author: renkai <gaelook...@gmail.com>
Date:   2016-08-11T10:48:50Z

[FLINK-4282]Add Offset Parameter to WindowAssigners




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-10 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416602#comment-15416602
 ] 

Renkai Ge commented on FLINK-4282:
--

I have the same opinion with [~aljoscha],the offset property for TimeWindow is 
redundant,the offset value should just effect the point which to devide the 
stream into windows,not effect the window it self.If we want a window with 
offset,we only need to change lines like 
<https://github.com/apache/flink/blob/90cfe0a7b499832cebc2a53f7b066f83dde17de5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java#L61>
 ,and since SessionWindows decide where to devide stream into windows by the 
timestamp of first message comes,files like 
ProcessingTimeSessionWindows.java,SlidingProcessingTimeWindows.java
and TumblingProcessingTimeWindows.java do not need to be changed for this issue.

> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-10 Thread Renkai Ge (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416556#comment-15416556
 ] 

Renkai Ge commented on FLINK-4282:
--

If there is no proper solution to this issue now,I want help solving it.Since 
relations between timezone and offsets is complex,there is a lot of ways to 
handle it(java.util.*,jodatime and java.time.* since Java 8),and people will 
have their own choice,so I think it should not be a part of Flink. We just give 
APIs to fill offsets in,and users chose their own way to get the offset.

> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4252:
-
Description: 
I'm trying the table apis.
I got some errors like this
My code is in the attachments

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
at TestMain$.main(TestMain.scala:31)
at TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: 
Table program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at 
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37)
at 
org.apache.flink.api.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:28)
at 
org.apache.flink.api.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:42)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38

[jira] [Created] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4252:


 Summary: Table program cannot be compiled
 Key: FLINK-4252
 URL: https://issues.apache.org/jira/browse/FLINK-4252
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.0
 Environment: OS X EI Captain
scala 2.11.7
jdk 8
Reporter: Renkai Ge
 Attachments: TestMain.scala

I'm trying the table apis.
I got some errors like this


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
at TestMain$.main(TestMain.scala:31)
at TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: 
Table program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at 
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scal

[jira] [Updated] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4252:
-
Attachment: TestMain.scala

> Table program cannot be compiled
> 
>
> Key: FLINK-4252
> URL: https://issues.apache.org/jira/browse/FLINK-4252
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: OS X EI Captain
> scala 2.11.7
> jdk 8
>Reporter: Renkai Ge
> Attachments: TestMain.scala
>
>
> I'm trying the table apis.
> I got some errors like this
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
>   at TestMain$.main(TestMain.scala:31)
>   at TestMain.main(TestMain.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
> in class org.apache.flink.api.table.runtime.FlatMapRunner caused an 
> exception: Table program cannot be compiled. This is a bug. Please file an 
> issue.
>   at 
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
>   at 
> org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
>   at org.