[jira] [Commented] (FLINK-6286) hbase command not found error

2017-04-11 Thread Jinjiang Ling (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965289#comment-15965289
 ] 

Jinjiang Ling commented on FLINK-6286:
--

Hi,[~greghogan], I have attach a patch and opened a pull request. Can you take 
a review?

> hbase command not found error
> -
>
> Key: FLINK-6286
> URL: https://issues.apache.org/jira/browse/FLINK-6286
> Project: Flink
>  Issue Type: Bug
>Reporter: Jinjiang Ling
>Priority: Minor
> Attachments: FLINK-6286-0.patch, FLINK-6286-1.patch
>
>
> As I'm using flink with the HBASE_CONF_DIR env variable and don't install 
> hbase, then I get the error message below.
> {quote}
> *bin/config.sh: line 303: hbase: command not found*
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6286) hbase command not found error

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965283#comment-15965283
 ] 

ASF GitHub Bot commented on FLINK-6286:
---

GitHub user lingjinjiang opened a pull request:

https://github.com/apache/flink/pull/3711

[FLINK-6286] [script] Fix the hbase command not found error

When using flink with the HBASE_CONF_DIR env variable and don't install 
hbase, then will get the "hbase command not found" error.

https://issues.apache.org/jira/browse/FLINK-6286

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lingjinjiang/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3711.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3711


commit f613e4b221bf4583574be11fe4c3fd5558290545
Author: lingjinjiang 
Date:   2017-04-12T02:53:52Z

[FLINK-6286] [script] Fix the hbase command not found error




> hbase command not found error
> -
>
> Key: FLINK-6286
> URL: https://issues.apache.org/jira/browse/FLINK-6286
> Project: Flink
>  Issue Type: Bug
>Reporter: Jinjiang Ling
>Priority: Minor
> Attachments: FLINK-6286-0.patch, FLINK-6286-1.patch
>
>
> As I'm using flink with the HBASE_CONF_DIR env variable and don't install 
> hbase, then I get the error message below.
> {quote}
> *bin/config.sh: line 303: hbase: command not found*
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3711: [FLINK-6286] [script] Fix the hbase command not fo...

2017-04-11 Thread lingjinjiang
GitHub user lingjinjiang opened a pull request:

https://github.com/apache/flink/pull/3711

[FLINK-6286] [script] Fix the hbase command not found error

When using flink with the HBASE_CONF_DIR env variable and don't install 
hbase, then will get the "hbase command not found" error.

https://issues.apache.org/jira/browse/FLINK-6286

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lingjinjiang/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3711.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3711


commit f613e4b221bf4583574be11fe4c3fd5558290545
Author: lingjinjiang 
Date:   2017-04-12T02:53:52Z

[FLINK-6286] [script] Fix the hbase command not found error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6286) hbase command not found error

2017-04-11 Thread Jinjiang Ling (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinjiang Ling updated FLINK-6286:
-
Attachment: FLINK-6286-1.patch

Remove a unnecessary echo command.

> hbase command not found error
> -
>
> Key: FLINK-6286
> URL: https://issues.apache.org/jira/browse/FLINK-6286
> Project: Flink
>  Issue Type: Bug
>Reporter: Jinjiang Ling
>Priority: Minor
> Attachments: FLINK-6286-0.patch, FLINK-6286-1.patch
>
>
> As I'm using flink with the HBASE_CONF_DIR env variable and don't install 
> hbase, then I get the error message below.
> {quote}
> *bin/config.sh: line 303: hbase: command not found*
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3705: [FLINK-6286] Fix the hbase command not found error

2017-04-11 Thread lingjinjiang
Github user lingjinjiang closed the pull request at:

https://github.com/apache/flink/pull/3705


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6286) hbase command not found error

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965233#comment-15965233
 ] 

ASF GitHub Bot commented on FLINK-6286:
---

Github user lingjinjiang closed the pull request at:

https://github.com/apache/flink/pull/3705


> hbase command not found error
> -
>
> Key: FLINK-6286
> URL: https://issues.apache.org/jira/browse/FLINK-6286
> Project: Flink
>  Issue Type: Bug
>Reporter: Jinjiang Ling
>Priority: Minor
> Attachments: FLINK-6286-0.patch
>
>
> As I'm using flink with the HBASE_CONF_DIR env variable and don't install 
> hbase, then I get the error message below.
> {quote}
> *bin/config.sh: line 303: hbase: command not found*
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat

2017-04-11 Thread Mateusz Zakarczemny (JIRA)
Mateusz Zakarczemny created FLINK-6298:
--

 Summary: Local execution is not setting RuntimeContext for 
RichOutputFormat
 Key: FLINK-6298
 URL: https://issues.apache.org/jira/browse/FLINK-6298
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.2.0, 1.1.0
Reporter: Mateusz Zakarczemny


RuntimeContext is never set in RichOutputFormat. I tested it in local 
execution. RichMapFunction is setup correctly. 

Following code will never print "//Context set in RichOutputFormat"
{code}

import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Startup {
  def main(args: Array[String]): Unit = {

val mapFunction = new RichMapFunction[String, String] {
  def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
  def map(event: String) = { event }
  override def setRuntimeContext(t: RuntimeContext) = {
println("//Context set in RichMapFunction")
super.setRuntimeContext(t)
  }
}

val outputFormat = new RichOutputFormat[String] {
  override def setRuntimeContext(t: RuntimeContext) = {
println("//Context set in RichOutputFormat")
super.setRuntimeContext(t)
  }
  def open(taskNumber: Int, numTasks: Int) {}
  def writeRecord(event: String) {
println(event)
  }
  def configure(parameters: Configuration): Unit = {}
  def close(): Unit = {}
}

val see = StreamExecutionEnvironment.getExecutionEnvironment
val eventsStream = see.fromElements[String]("A", "B", "C").map(mapFunction)
eventsStream.writeUsingOutputFormat(outputFormat)
see.execute("test-job")
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6297) CEP timeout does not trigger under certain conditions

2017-04-11 Thread Vijayakumar Palaniappan (JIRA)
Vijayakumar Palaniappan created FLINK-6297:
--

 Summary: CEP timeout does not trigger under certain conditions
 Key: FLINK-6297
 URL: https://issues.apache.org/jira/browse/FLINK-6297
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Vijayakumar Palaniappan


-TimeoutPattern does not trigger under certain conditions. Following are the 
preconditions: 

-Assume a pattern of Event A followed by Event B within 2 Seconds

-PeriodicWaterMarks every 1 second

-Assume following events have arrived. 

-Event A-1[time: 1 sec]

-Event B-1[time: 2 sec] 

-Event A-2[time: 2 sec]

-Event A-3[time: 5 sec] 

-WaterMark[time: 5 sec]

I would assume that after watermark arrival, Event A-1,B-1 detected. A-2 timed 
out. But A-2 timeout does not happen.

if i use a punctuated watermark and generate watermark for every event, it 
seems to work as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6296) Retrieving execution plan fails succeeding execution attempt

2017-04-11 Thread Petr Novotnik (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petr Novotnik updated FLINK-6296:
-
Description: 
Hello,

calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
{{ExecutionEnvironment#execute()}} makes the later fail with the following 
message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster ...}}:

{noformat}
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
{noformat}

It works fine when executing the job locally (and it works fine locally or even 
on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 

So far I could track it down, the problem seems the "clearing" behaviour of 
{{ContextEnvironment#createProgramPlan(..)}}. This makes me wonder why 
{{#getExecutionPlan}} uses the one parameter version of 
{{#createProgramPlan(..)}}. Is "clearing" the sinks really required as part of 
{{getExecutionPlan()}}?

>From a [code 
>perspective|https://github.com/apache/flink/blob/release-1.1.5/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68],
> it seems the same is present on Flink 1.1.5, but I haven't tested it,
P.


  was:
Hello,

calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
{{ExecutionEnvironment#execute()}} makes the later fail with the following 
message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster ...}}:

{noformat}
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
{noformat}

It works fine when executing the job locally (and it works fine locally or even 
on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 

So far I could track it down, the problem seems the "clearing" behaviour of 
{{ContextEnvironment#createProgramPlan(..)}}. This makes me wonder why 
{{#getExecutionPlan}} uses the one parameter version of 
{{#createProgramPlan(..)}}. Is "clearing" the sinks really required as part of 
{{getExecutionPlan()}}?

>From a [code 
>perspective](https://github.com/apache/flink/blob/release-1.1.5/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68),
> it seems the same is present on Flink 1.1.5, but I haven't tested it,
P.



> Retrieving execution plan fails succeeding execution attempt
> 
>
> Key: FLINK-6296
> URL: https://issues.apache.org/jira/browse/FLINK-6296
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Job-Submission, YARN
>Affects Versions: 1.2.0
>Reporter: Petr Novotnik
>
> Hello,
> calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
> {{ExecutionEnvironment#execute()}} makes the later fail with the following 
> message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster 
> ...}}:
> {noformat}
> > Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> > since the last execution. The last execution refers to the latest call to 
> > 'execute()', 'count()', 'collect()', or 'print()'.
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
> > at 
> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> {noformat}
> It works fine when executing the job locally (and it works fine locally or 
> even on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 
> So far I could track it down, the problem seems the "clearing" behaviour of 
> 

[jira] [Updated] (FLINK-6296) Retrieving execution plan fails succeeding execution attempt

2017-04-11 Thread Petr Novotnik (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petr Novotnik updated FLINK-6296:
-
Description: 
Hello,

calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
{{ExecutionEnvironment#execute()}} makes the later fail with the following 
message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster ...}}:

{noformat}
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
{noformat}

It works fine when executing the job locally (and it works fine locally or even 
on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 

So far I could track it down, the problem seems the "clearing" behaviour of 
{{ContextEnvironment#createProgramPlan(..)}}. This makes me wonder why 
{{#getExecutionPlan}} uses the one parameter version of 
{{#createProgramPlan(..)}}. Is "clearing" the sinks really required as part of 
{{getExecutionPlan()}}?

>From a [code 
>perspective](https://github.com/apache/flink/blob/release-1.1.5/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68),
> it seems the same is present on Flink 1.1.5, but I haven't tested it,
P.


  was:
Hello,

calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
{{ExecutionEnvironment#execute()}} makes the later fail with the following 
message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster ...}}:

{noformat}
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
{noformat}

It works fine when executing the job locally (and it works fine locally or even 
on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 

So far I could track it down, the problem seems the "clearing" behaviour of 
{{ContextEnvironment#createProgramPlan(..)}}. This makes me wonder why 
{{#getExecutionPlan}} uses the one parameter version of 
{{#createProgramPlan(..)}}. Is "clearing" the sinks really required as part of 
{{getExecutionPlan()}}?

I just check, in Flink 1.1.5, {{#getExecutionPlan(..)}} invokes 
{{#createProgramPlan("unnamed job", false)}}, hence, it would be good to 
document as a breaking change [in the migration 
guide|https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/upgrading.html]
 since the resulting error message is rather misleading.
P.


> Retrieving execution plan fails succeeding execution attempt
> 
>
> Key: FLINK-6296
> URL: https://issues.apache.org/jira/browse/FLINK-6296
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Job-Submission, YARN
>Affects Versions: 1.2.0
>Reporter: Petr Novotnik
>
> Hello,
> calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
> {{ExecutionEnvironment#execute()}} makes the later fail with the following 
> message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster 
> ...}}:
> {noformat}
> > Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> > since the last execution. The last execution refers to the latest call to 
> > 'execute()', 'count()', 'collect()', or 'print()'.
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
> > at 
> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
> > at 
> > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> {noformat}
> It works fine when executing the job locally (and it works fine locally or 
> even on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 
> So far I could 

[jira] [Created] (FLINK-6296) Retrieving execution plan fails succeeding execution attempt

2017-04-11 Thread Petr Novotnik (JIRA)
Petr Novotnik created FLINK-6296:


 Summary: Retrieving execution plan fails succeeding execution 
attempt
 Key: FLINK-6296
 URL: https://issues.apache.org/jira/browse/FLINK-6296
 Project: Flink
  Issue Type: Bug
  Components: DataSet API, Job-Submission, YARN
Affects Versions: 1.2.0
Reporter: Petr Novotnik


Hello,

calling {{ExecutionEnvironment#getExecutionPlan()}} prior to 
{{ExecutionEnvironment#execute()}} makes the later fail with the following 
message if I try to run my job on yarn, i.e. {{flink run -m yarn-cluster ...}}:

{noformat}
> Caused by: java.lang.RuntimeException: No new data sinks have been defined 
> since the last execution. The last execution refers to the latest call to 
> 'execute()', 'count()', 'collect()', or 'print()'.
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1050)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:59)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
{noformat}

It works fine when executing the job locally (and it works fine locally or even 
on yarn using the DataStream API, i.e. {{StreamExecutionEnvironment}}). 

So far I could track it down, the problem seems the "clearing" behaviour of 
{{ContextEnvironment#createProgramPlan(..)}}. This makes me wonder why 
{{#getExecutionPlan}} uses the one parameter version of 
{{#createProgramPlan(..)}}. Is "clearing" the sinks really required as part of 
{{getExecutionPlan()}}?

I just check, in Flink 1.1.5, {{#getExecutionPlan(..)}} invokes 
{{#createProgramPlan("unnamed job", false)}}, hence, it would be good to 
document as a breaking change [in the migration 
guide|https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/upgrading.html]
 since the resulting error message is rather misleading.
P.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6275:
---

Assignee: mingleizhang

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964530#comment-15964530
 ] 

mingleizhang commented on FLINK-6143:
-

[~Zentol] Thanks for your help and the pr is ready now. Could you please help 
me review ? Many thanks go out to you.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964524#comment-15964524
 ] 

ASF GitHub Bot commented on FLINK-6143:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3710

[FLINK-6143] [clients] Fix unprotected access to this.flink in LocalE…

…xecutor#endSession.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink_6143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3710


commit 83904c4e736d17cb32ffcbf1d2f90b5521837284
Author: zhangminglei 
Date:   2017-04-11T15:29:50Z

[FLINK-6143] [clients] Fix unprotected access to this.flink in 
LocalExecutor#endSession.




> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3710: [FLINK-6143] [clients] Fix unprotected access to t...

2017-04-11 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3710

[FLINK-6143] [clients] Fix unprotected access to this.flink in LocalE…

…xecutor#endSession.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink_6143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3710


commit 83904c4e736d17cb32ffcbf1d2f90b5521837284
Author: zhangminglei 
Date:   2017-04-11T15:29:50Z

[FLINK-6143] [clients] Fix unprotected access to this.flink in 
LocalExecutor#endSession.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

2017-04-11 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3700#discussion_r110902190
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import 
org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param aggregates the 
[[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
+ *   used for this sort aggregation
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param aggType It is used to mark the Aggregate type
+ */
+class ProcTimeUnboundedSortProcessFunction(
+  private val aggregates: MultiOutputAggregateFunction[_],
--- End diff --

@fhueske Ok i will move the implementation into a processFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

2017-04-11 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3700#discussion_r110901541
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  calc: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  calc,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + calc.getId())
+  }
+
+  override def toString: String = {
+s"Sort($calc)"+
+  s"on fields: (${calc.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", calc)
+  .item("sort fields",calc.collation.getFieldCollations)
+  .itemIf("offset", calc.offset, calc.offset!=null)
+  .itemIf("fetch", calc.fetch, calc.fetch!=null)
+  .item("input", inputNode)
+  }
+
+  override def 

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

2017-04-11 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3700#discussion_r110900961
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.aggregate.UntypedOrdering
+import 
org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
+import java.util.{ List => JList,ArrayList }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+/** The initial accumulator for Sort aggregate function */
+class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with 
Accumulator with Serializable
+
+/**
+  * Base class for built-in Min aggregate function
+  *
+  * @tparam K the type for the key sort type
+  * @tparam T the type for the aggregation result
+  */
+abstract class SortAggFunction[K,T](
+val keyIndexes: Array[Int],
+val keySortDirections: Array[Direction],
+val orderings: Array[UntypedOrdering]) extends 
MultiOutputAggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+val acc = new SortAccumulator
+  
+acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+if (value != null) {
+  val v = value.asInstanceOf[Row]
+  val acc = accumulator.asInstanceOf[SortAccumulator]
+  
+  var i = 0
+  //create the (compose) key of the new value
+  val keyV = new Row(keyIndexes.size)
+  while (i i += 1 //same key and need to sort on consequent keys 
+  case g if g > 0 => {
+acc.insert(j, new JTuple2(keyV,v)) //add new element in place
--- End diff --

True! i would change the data structures 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

2017-04-11 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3700#discussion_r110900641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.aggregate.UntypedOrdering
+import 
org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
+import java.util.{ List => JList,ArrayList }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+/** The initial accumulator for Sort aggregate function */
+class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with 
Accumulator with Serializable
+
+/**
+  * Base class for built-in Min aggregate function
+  *
+  * @tparam K the type for the key sort type
+  * @tparam T the type for the aggregation result
+  */
+abstract class SortAggFunction[K,T](
--- End diff --

@fhueske 
Both are fine - i do not think there is one particular simpler than the 
other. The reason i chose the aggregate-based implementation was based on the 
initial discussions to treat this as an aggregation as well. However, i can 
make another version in which i would move this in the processFunction.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

2017-04-11 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3700#discussion_r110900086
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.aggfunctions.RowSortAggFunction
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+val sortAggregates = createSortAggregation(inputType, 
keySortFields,keySortDirections, false)
+
+val aggType = createSingleAccumulatorRowType(sortAggregates)
+
+   new ProcTimeUnboundedSortProcessFunction(
+  sortAggregates,
+  inputType.getFieldCount,
+  aggType)
+
+  }
+
+  
+   /**
+   * Function creates a sorting aggregation object 
+   * elements based on proctime and potentially other fields
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * @param keySortDirections the directions of the sorts for each field. 
+   * First is expected to be the time  
+   * @return SortAggregationFunction
+   */
+  private def createSortAggregation(
+inputType: RelDataType,
+keyIndex: Array[Int],
+keySortDirections: Array[Direction],
+retraction: Boolean): MultiOutputAggregateFunction[_] = {
+
+val orderings = createOrderingComparison(inputType, keyIndex)
+
+val sortKeyType = toKeySortInternalRowTypeInfo(inputType, 
keyIndex).asInstanceOf[RowTypeInfo]
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val sortAggFunc = new RowSortAggFunction(keyIndex,
+keySortDirections, orderings, rowTypeInfo, sortKeyType)
+
+sortAggFunc
+
+  }
+  
+   /**
+   * Function creates a typed based comparison objects 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @return Array of ordering objects
+ 

[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964253#comment-15964253
 ] 

ASF GitHub Bot commented on FLINK-6295:
---

GitHub user WangTaoTheTonic opened a pull request:

https://github.com/apache/flink/pull/3709

[FLINK-6295]use LoadingCache instead of WeakHashMap to lower latency

Now in ExecutionGraphHolder, which is used in many handlers, we use a 
WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
collection.

The latency is too high when JVM do GC rarely, which will make status of 
jobs or its tasks unmatched with the real ones. (WE once observed that the web 
still shows tasks cancelled/failed, after the actual states of tasks coming 
back to normal for **30+ mins,** until a gc happened)

LoadingCache is a common used cache implementation from guava lib, we can 
use its time based eviction to lower latency of status update.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WangTaoTheTonic/flink FLINK-6295

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3709.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3709


commit d76ced06242623d150f9ad09205e2b92f910c1a1
Author: WangTaoTheTonic 
Date:   2017-04-11T11:48:52Z

use LoadingCache instead of WeakHashMap to lower latency




> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3709: [FLINK-6295]use LoadingCache instead of WeakHashMa...

2017-04-11 Thread WangTaoTheTonic
GitHub user WangTaoTheTonic opened a pull request:

https://github.com/apache/flink/pull/3709

[FLINK-6295]use LoadingCache instead of WeakHashMap to lower latency

Now in ExecutionGraphHolder, which is used in many handlers, we use a 
WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
collection.

The latency is too high when JVM do GC rarely, which will make status of 
jobs or its tasks unmatched with the real ones. (WE once observed that the web 
still shows tasks cancelled/failed, after the actual states of tasks coming 
back to normal for **30+ mins,** until a gc happened)

LoadingCache is a common used cache implementation from guava lib, we can 
use its time based eviction to lower latency of status update.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WangTaoTheTonic/flink FLINK-6295

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3709.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3709


commit d76ced06242623d150f9ad09205e2b92f910c1a1
Author: WangTaoTheTonic 
Date:   2017-04-11T11:48:52Z

use LoadingCache instead of WeakHashMap to lower latency




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-11 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964250#comment-15964250
 ] 

Stefano Bortoli commented on FLINK-6250:


According to the JIRA issue open to Calcite, there is no clear plan to solve 
the DISTINCT aggregation problem. You mentioned that we could work around it in 
Flink and then remove the workaround when calcite supports it. Not sure where I 
should start from. Is it something like for the LogicalAggregation?

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-11 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang reassigned FLINK-6295:
---

Assignee: Tao Wang

> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5901) DAG can not show properly in IE

2017-04-11 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang reassigned FLINK-5901:
---

Assignee: Tao Wang

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
> Attachments: using chrom(same job).png, using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well. I pasted the screeshot under IE and Chrome below.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5901) DAG can not show properly in IE

2017-04-11 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang reassigned FLINK-5901:
---

Assignee: (was: Tao Wang)

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
>Priority: Critical
> Attachments: using chrom(same job).png, using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well. I pasted the screeshot under IE and Chrome below.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-11 Thread Tao Wang (JIRA)
Tao Wang created FLINK-6295:
---

 Summary: use LoadingCache instead of WeakHashMap to lower latency
 Key: FLINK-6295
 URL: https://issues.apache.org/jira/browse/FLINK-6295
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Reporter: Tao Wang


Now in ExecutionGraphHolder, which is used in many handlers, we use a 
WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
collection.

The latency is too high when JVM do GC rarely, which will make status of jobs 
or its tasks unmatched with the real ones.

LoadingCache is a common used cache implementation from guava lib, we can use 
its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964188#comment-15964188
 ] 

Chesnay Schepler commented on FLINK-6143:
-

[~mingleizhang] I've given you contributor permissions; you can now assign 
issues to yourself.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-6143:
---

Assignee: mingleizhang

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153
 ] 

mingleizhang edited comment on FLINK-6143 at 4/11/17 10:37 AM:
---

[~tedyu] Hi, Could you please assign this jira to me ? I will work on this 
soon. Thanks.


was (Author: mingleizhang):
[~tedyu] Hi, Could you please assgin this jira to me ? I will work on this 
soon. Thanks.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153
 ] 

mingleizhang commented on FLINK-6143:
-

[~tedyu] Hi, Could you please assgin this jira to me ? I will work on this 
soon. Thanks.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6293) Flakey JobManagerITCase

2017-04-11 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6293:
---
Component/s: (was: Job-Submission)
 JobManager

> Flakey JobManagerITCase
> ---
>
> Key: FLINK-6293
> URL: https://issues.apache.org/jira/browse/FLINK-6293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see 
> https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true 
> The maven watchdog kills the build due to not output being produced within 
> 300s and {{JobManagerITCase}} seems to hang in line 772, i.e.
> {code:title=JobManagerITCase lines 
> 770-772|language=java|linenumbers=true|firstline=770}
> // Trigger savepoint for non-existing job
> jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
> val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
> {code}
> Although the (downloaded) logs do not quite allow a precise mapping to this 
> test case, it looks as if the following block may be related:
> {code}
> 09:34:47,684 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Akka ask timeout set to 100s
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Disabled queryable state server
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Starting FlinkMiniCluster.
> 09:34:47,809 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 09:34:47,837 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062
> 09:34:47,838 WARN  org.apache.flink.runtime.net.SSLUtils  
>- Not a SSL socket, will skip setting tls version and cipher suites.
> 09:34:47,839 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max 
> backlog: 1000
> 09:34:47,840 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>- No metrics reporter configured, no metrics will be exposed/reported.
> 09:34:47,850 INFO  
> org.apache.flink.runtime.testingUtils.TestingMemoryArchivist  - Started 
> memory archivist akka://flink/user/archive_1
> 09:34:47,860 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 09:34:47,861 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Starting JobManager at akka://flink/user/jobmanager_1.
> 09:34:47,862 WARN  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Discard message 
> LeaderSessionMessage(----,TriggerSavepoint(6e813070338a23b0ff571646bca56521,Some(any)))
>  because there is currently no valid leader id known.
> 09:34:47,862 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- JobManager akka://flink/user/jobmanager_1 was granted leadership with 
> leader session ID Some(----).
> 09:34:47,867 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager_1#-652927556] - leader session 
> ----
> {code}
> If so, then this may be related to FLINK-6287 and may possibly even be a 
> duplicate.
> What is strange though is that the timeout for the expected message to arrive 
> is no more than 2m and thus the test should properly fail within 300s.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6293) Flakey JobManagerITCase

2017-04-11 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964138#comment-15964138
 ] 

Nico Kruber commented on FLINK-6293:


Same here (with only the {{transfer.sh}} upload changed compared to master)
https://s3.amazonaws.com/archive.travis-ci.org/jobs/220888197/log.txt

> Flakey JobManagerITCase
> ---
>
> Key: FLINK-6293
> URL: https://issues.apache.org/jira/browse/FLINK-6293
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, Tests
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see 
> https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true 
> The maven watchdog kills the build due to not output being produced within 
> 300s and {{JobManagerITCase}} seems to hang in line 772, i.e.
> {code:title=JobManagerITCase lines 
> 770-772|language=java|linenumbers=true|firstline=770}
> // Trigger savepoint for non-existing job
> jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
> val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
> {code}
> Although the (downloaded) logs do not quite allow a precise mapping to this 
> test case, it looks as if the following block may be related:
> {code}
> 09:34:47,684 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Akka ask timeout set to 100s
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Disabled queryable state server
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Starting FlinkMiniCluster.
> 09:34:47,809 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 09:34:47,837 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062
> 09:34:47,838 WARN  org.apache.flink.runtime.net.SSLUtils  
>- Not a SSL socket, will skip setting tls version and cipher suites.
> 09:34:47,839 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max 
> backlog: 1000
> 09:34:47,840 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>- No metrics reporter configured, no metrics will be exposed/reported.
> 09:34:47,850 INFO  
> org.apache.flink.runtime.testingUtils.TestingMemoryArchivist  - Started 
> memory archivist akka://flink/user/archive_1
> 09:34:47,860 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 09:34:47,861 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Starting JobManager at akka://flink/user/jobmanager_1.
> 09:34:47,862 WARN  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Discard message 
> LeaderSessionMessage(----,TriggerSavepoint(6e813070338a23b0ff571646bca56521,Some(any)))
>  because there is currently no valid leader id known.
> 09:34:47,862 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- JobManager akka://flink/user/jobmanager_1 was granted leadership with 
> leader session ID Some(----).
> 09:34:47,867 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager_1#-652927556] - leader session 
> ----
> {code}
> If so, then this may be related to FLINK-6287 and may possibly even be a 
> duplicate.
> What is strange though is that the timeout for the expected message to arrive 
> is no more than 2m and thus the test should properly fail within 300s.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6294) BucketingSink throws NPE while cancelling job

2017-04-11 Thread Andrey (JIRA)
Andrey created FLINK-6294:
-

 Summary: BucketingSink throws NPE while cancelling job
 Key: FLINK-6294
 URL: https://issues.apache.org/jira/browse/FLINK-6294
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.2.0
Reporter: Andrey


Steps to reproduce:
* configure BucketingSink and run job
* cancel job from UI before processing any messages
* in logs:
{code}
2017-04-11 10:14:54,681 INFO  org.apache.flink.core.fs.FileSystem   
- Ensuring all FileSystem streams are closed for Source: Custom 
Source (1/2) [Source: Custom Source (1/2)]
2017-04-11 10:14:54,881 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Un-registering task and sending final execution state CANCELED to 
JobManager for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) 
[flink-akka.actor.default-dispatcher-4]
2017-04-11 10:14:56,584 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)]
java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6293) Flakey JobManagerITCase

2017-04-11 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-6293:
--

 Summary: Flakey JobManagerITCase
 Key: FLINK-6293
 URL: https://issues.apache.org/jira/browse/FLINK-6293
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission, Tests
Affects Versions: 1.3.0
Reporter: Nico Kruber


Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see 
https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true 

The maven watchdog kills the build due to not output being produced within 300s 
and {{JobManagerITCase}} seems to hang in line 772, i.e.
{code:title=JobManagerITCase lines 
770-772|language=java|linenumbers=true|firstline=770}
// Trigger savepoint for non-existing job
jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
{code}

Although the (downloaded) logs do not quite allow a precise mapping to this 
test case, it looks as if the following block may be related:

{code}
09:34:47,684 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
 - Akka ask timeout set to 100s
09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
 - Disabled queryable state server
09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
 - Starting FlinkMiniCluster.
09:34:47,809 INFO  akka.event.slf4j.Slf4jLogger 
 - Slf4jLogger started
09:34:47,837 INFO  org.apache.flink.runtime.blob.BlobServer 
 - Created BLOB server storage directory 
/tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062
09:34:47,838 WARN  org.apache.flink.runtime.net.SSLUtils
 - Not a SSL socket, will skip setting tls version and cipher suites.
09:34:47,839 INFO  org.apache.flink.runtime.blob.BlobServer 
 - Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max 
backlog: 1000
09:34:47,840 INFO  org.apache.flink.runtime.metrics.MetricRegistry  
 - No metrics reporter configured, no metrics will be exposed/reported.
09:34:47,850 INFO  org.apache.flink.runtime.testingUtils.TestingMemoryArchivist 
 - Started memory archivist akka://flink/user/archive_1
09:34:47,860 INFO  org.apache.flink.runtime.testutils.TestingResourceManager
 - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
09:34:47,861 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager  
 - Starting JobManager at akka://flink/user/jobmanager_1.
09:34:47,862 WARN  org.apache.flink.runtime.testingUtils.TestingJobManager  
 - Discard message 
LeaderSessionMessage(----,TriggerSavepoint(6e813070338a23b0ff571646bca56521,Some(any)))
 because there is currently no valid leader id known.
09:34:47,862 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager  
 - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader 
session ID Some(----).
09:34:47,867 INFO  org.apache.flink.runtime.testutils.TestingResourceManager
 - Resource Manager associating with leading JobManager 
Actor[akka://flink/user/jobmanager_1#-652927556] - leader session 
----
{code}

If so, then this may be related to FLINK-6287 and may possibly even be a 
duplicate.

What is strange though is that the timeout for the expected message to arrive 
is no more than 2m and thus the test should properly fail within 300s.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964122#comment-15964122
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
@vpernin Thanks very much for your attention. The PR is supposed to work on 
1.3-SNAPSHOT, but it's not testable now due to some known bugs. 

Besides, i want to add support for asynchronous snapshots of timers in this 
pull request. Currently, the snapshots for timers are taken synchronously --- 
no stream record can be processed before the snapshots are taken. In our tests 
where there are millions of timers, it takes approximately several seconds to 
complete the snapshotting. The performance, hence, is significantly degraded 
when the checkpoint frequency is large.

To allow asynchronous snapshotting, we need some refactoring on how 
internal timer services are restored and snapshotted. Now 
`InternalTimerService` s, similar to keyed states, are stored in 
`KeyedStateBackend`. That way, we can benefit from the optimizations made on 
the snapshotting of keyed states, taking snapshots asynchronously (and 
incrementally in the near future).

I am working on this work right now.  It's appreciated that you could help 
test the feature when it is done. 


> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2017-04-11 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3359
  
@vpernin Thanks very much for your attention. The PR is supposed to work on 
1.3-SNAPSHOT, but it's not testable now due to some known bugs. 

Besides, i want to add support for asynchronous snapshots of timers in this 
pull request. Currently, the snapshots for timers are taken synchronously --- 
no stream record can be processed before the snapshots are taken. In our tests 
where there are millions of timers, it takes approximately several seconds to 
complete the snapshotting. The performance, hence, is significantly degraded 
when the checkpoint frequency is large.

To allow asynchronous snapshotting, we need some refactoring on how 
internal timer services are restored and snapshotted. Now 
`InternalTimerService` s, similar to keyed states, are stored in 
`KeyedStateBackend`. That way, we can benefit from the optimizations made on 
the snapshotting of keyed states, taking snapshots asynchronously (and 
incrementally in the near future).

I am working on this work right now.  It's appreciated that you could help 
test the feature when it is done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3708: [FLINK-6292] fix transfer.sh upload by using https

2017-04-11 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3708

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6292

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3708


commit b704a31ec8e6c26aef58675b6724559622146399
Author: Nico Kruber 
Date:   2017-04-11T09:26:29Z

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6292) Travis: transfer.sh not accepting uploads via http:// anymore

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964092#comment-15964092
 ] 

ASF GitHub Bot commented on FLINK-6292:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3708

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6292

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3708


commit b704a31ec8e6c26aef58675b6724559622146399
Author: Nico Kruber 
Date:   2017-04-11T09:26:29Z

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.




> Travis: transfer.sh not accepting uploads via http:// anymore
> -
>
> Key: FLINK-6292
> URL: https://issues.apache.org/jira/browse/FLINK-6292
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.2.0, 1.3.0, 1.1.5
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The {{travis_mvn_watchdog.sh}} script tries to upload the logs to transfer.sh 
> but it seems like they do not accept uploads to {{http://transfer.sh}} 
> anymore and only accept {{https}} nowadays.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873
 ] 

mingleizhang edited comment on FLINK-6275 at 4/11/17 9:43 AM:
--

[~tedyu] Could you please assgin this jira to me ? Let me inside it what 
happened and I will give a patch to this jira soon.


was (Author: mingleizhang):
Let me inside it what happened and I will give a patch to this jira soon.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6292) Travis: transfer.sh not accepting uploads via http:// anymore

2017-04-11 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-6292:
--

 Summary: Travis: transfer.sh not accepting uploads via http:// 
anymore
 Key: FLINK-6292
 URL: https://issues.apache.org/jira/browse/FLINK-6292
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.2.0, 1.3.0, 1.1.5
Reporter: Nico Kruber
Assignee: Nico Kruber


The {{travis_mvn_watchdog.sh}} script tries to upload the logs to transfer.sh 
but it seems like they do not accept uploads to {{http://transfer.sh}} anymore 
and only accept {{https}} nowadays.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6245) Fix late side output documentation in Window documents.

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964067#comment-15964067
 ] 

mingleizhang commented on FLINK-6245:
-

[~kkl0u] Could you please specify information of where the codes in Flink 
project like packageName.ClassName ? 

> Fix late side output documentation in Window documents.
> ---
>
> Key: FLINK-6245
> URL: https://issues.apache.org/jira/browse/FLINK-6245
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Priority: Minor
> Fix For: 1.3.0
>
>
> There are two things that need to be done:
> 1) in the syntax description in the beginning of the page, we should also 
> include the {{getSideOutput()}}
> 2) in the "Getting late data as a side output" section and for the Java 
> example, it should not be a {{DataStream result ...}} but a 
> {{SingleOutputStreamOperator}}, if we want to get the late event side output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6291) Internal Timer service cannot be "removed"

2017-04-11 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6291:
-

 Summary: Internal Timer service cannot be "removed"
 Key: FLINK-6291
 URL: https://issues.apache.org/jira/browse/FLINK-6291
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.2.0
Reporter: Gyula Fora


Currently it is not possible to register an internal timer service in one job 
and remove it after a savepoint as a nullpointer exception is thrown in the 
next savepoint:

Caused by: java.lang.Exception: Could not write timer service of MyOperator 
(17/60) to checkpoint state stream.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:418)
at 
com.king.rbea.backend.operators.scriptexecution.RBEAOperator.snapshotState(RBEAOperator.java:327)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 13 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:294)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:414)
... 15 more

The timer serializer is null in this case as the timer service has never been 
started properly.

We should probably discard the timers for the services that are not 
reregistered after restore so we can get rid of the state completely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964022#comment-15964022
 ] 

ASF GitHub Bot commented on FLINK-6275:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3707
  
@rmetzger @greghogan Could you please take some time review my code ? Many 
thanks to you both.


> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3707: [FLINK-6275] [yarn] Fix unprotected access to resourceMan...

2017-04-11 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3707
  
@rmetzger @greghogan Could you please take some time review my code ? Many 
thanks to you both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963968#comment-15963968
 ] 

ASF GitHub Bot commented on FLINK-6275:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3707

[FLINK-6275] [yarn] Fix unprotected access to resourceManage in 
YarnFlinkApplicationMasterRunner

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3707


commit 145c7239c5e3f5f84b2f7149a1bc275013582121
Author: rice.zhang 
Date:   2017-04-11T07:15:11Z

[FLINK-6275] [yarn] Fix unprotected access to resourceManage in 
YarnFlinkApplicationMasterRunner




> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3707: [FLINK-6275] [yarn] Fix unprotected access to reso...

2017-04-11 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/3707

[FLINK-6275] [yarn] Fix unprotected access to resourceManage in 
YarnFlinkApplicationMasterRunner

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3707


commit 145c7239c5e3f5f84b2f7149a1bc275013582121
Author: rice.zhang 
Date:   2017-04-11T07:15:11Z

[FLINK-6275] [yarn] Fix unprotected access to resourceManage in 
YarnFlinkApplicationMasterRunner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873
 ] 

mingleizhang edited comment on FLINK-6275 at 4/11/17 7:03 AM:
--

Let me inside it what happened and I will give a patch to this jira soon.


was (Author: mingleizhang):
Let me inside it what happened.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)