[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964530#comment-15964530
 ] 

mingleizhang commented on FLINK-6143:
-

[~Zentol] Thanks for your help and the pr is ready now. Could you please help 
me review ? Many thanks go out to you.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6275:
---

Assignee: mingleizhang

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873
 ] 

mingleizhang edited comment on FLINK-6275 at 4/11/17 7:03 AM:
--

Let me inside it what happened and I will give a patch to this jira soon.


was (Author: mingleizhang):
Let me inside it what happened.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-10 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873
 ] 

mingleizhang commented on FLINK-6275:
-

Let me inside it what happened.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873
 ] 

mingleizhang edited comment on FLINK-6275 at 4/11/17 9:43 AM:
--

[~tedyu] Could you please assgin this jira to me ? Let me inside it what 
happened and I will give a patch to this jira soon.


was (Author: mingleizhang):
Let me inside it what happened and I will give a patch to this jira soon.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6245) Fix late side output documentation in Window documents.

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964067#comment-15964067
 ] 

mingleizhang commented on FLINK-6245:
-

[~kkl0u] Could you please specify information of where the codes in Flink 
project like packageName.ClassName ? 

> Fix late side output documentation in Window documents.
> ---
>
> Key: FLINK-6245
> URL: https://issues.apache.org/jira/browse/FLINK-6245
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Priority: Minor
> Fix For: 1.3.0
>
>
> There are two things that need to be done:
> 1) in the syntax description in the beginning of the page, we should also 
> include the {{getSideOutput()}}
> 2) in the "Getting late data as a side output" section and for the Java 
> example, it should not be a {{DataStream result ...}} but a 
> {{SingleOutputStreamOperator}}, if we want to get the late event side output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153
 ] 

mingleizhang edited comment on FLINK-6143 at 4/11/17 10:37 AM:
---

[~tedyu] Hi, Could you please assign this jira to me ? I will work on this 
soon. Thanks.


was (Author: mingleizhang):
[~tedyu] Hi, Could you please assgin this jira to me ? I will work on this 
soon. Thanks.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-11 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153
 ] 

mingleizhang commented on FLINK-6143:
-

[~tedyu] Hi, Could you please assgin this jira to me ? I will work on this 
soon. Thanks.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()

2017-04-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966974#comment-15966974
 ] 

mingleizhang commented on FLINK-6143:
-

[~StephanEwen] , Hi, Could you please take some time review this patch ? Many 
thanks go out to you.

> Unprotected access to this.flink in LocalExecutor#endSession()
> --
>
> Key: FLINK-6143
> URL: https://issues.apache.org/jira/browse/FLINK-6143
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   public void endSession(JobID jobID) throws Exception {
> LocalFlinkMiniCluster flink = this.flink;
> if (flink != null) {
> {code}
> The flink field is not declared volatile and access to this.flink doesn't 
> hold the LocalExecutor.lock



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-04-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966977#comment-15966977
 ] 

mingleizhang commented on FLINK-6275:
-

[~StephanEwen] Hi, Could you please take some time review this patch and give 
some advice on this ? Thanks.

> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970586#comment-15970586
 ] 

mingleizhang edited comment on FLINK-6130 at 4/17/17 1:51 AM:
--

[~tedyu] Could you please help review the code ? How do you think of this ? 
Thanks.
{code}
Future future;
synchronized (lock) {
// wait for resource manager to finish
 future = (Future) resourceManager.getTerminationFuture();
}
Object object = future.get();
// everything started, we can wait until all is done or the process is 
killed
LOG.info("YARN Application Master finished" + object.toString());
{code}


was (Author: mingleizhang):
[~tedyu] Could you please help review the code ? How do you think of this ? 
Thanks.
{code}
Future future;
synchronized (lock) {
// wait for resource manager to finish
 future = (Future) 
resourceManager.getTerminationFuture();
}
Object object = future.get();
// everything started, we can wait until all is done or 
the process is killed
LOG.info("YARN Application Master finished" + 
object.toString());
{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970586#comment-15970586
 ] 

mingleizhang commented on FLINK-6130:
-

[~tedyu] Could you please help review the code ? How do you think of this ? 
Thanks.
{code}
Future future;
synchronized (lock) {
// wait for resource manager to finish
 future = (Future) 
resourceManager.getTerminationFuture();
}
Object object = future.get();
// everything started, we can wait until all is done or 
the process is killed
LOG.info("YARN Application Master finished" + 
object.toString());
{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970598#comment-15970598
 ] 

mingleizhang commented on FLINK-6130:
-

[~tedyu] Yep. And the code like below, how do you think of this ? Thanks.

{code}
@Override
protected int runApplicationMaster(Configuration config) {
 Future future; 

synchronized (lock) {
LOG.info("Starting High Availability Services");

   // wait for resource manager to finish
future = (Future) resourceManager.getTerminationFuture();

//  (5) start the web monitor
// TODO: add web monitor
}
Object object = future.value().get();

// everything started, we can wait until all is done or the process is 
killed
LOG.info("YARN Application Master finished" + object.toString());
{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970602#comment-15970602
 ] 

mingleizhang commented on FLINK-6130:
-

[~tedyu] Thanks and appreciate it again. If there is no more question, I will 
give a PR soon.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang commented on FLINK-5611:
-

[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code}

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5943:
---

Assignee: mingleizhang

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970759#comment-15970759
 ] 

mingleizhang commented on FLINK-5943:
-

[~tedyu] Hi, Ted, How do you think of this ? I have put what access to 
haSerivices inside of the synchronized block.Thanks. 
{code}
protected void shutdown(ApplicationStatus status, String msg) {
synchronized (lock) {
// Need to clear the job state in the HA services before 
shutdown
try {

haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
}
catch (Throwable t) {
LOG.warn("Could not clear the job at the 
high-availability services", t);
}
if (jobManagerRunner != null) {
try {
jobManagerRunner.shutdown();
} catch (Throwable tt) {
LOG.warn("Failed to stop the JobManagerRunner", tt);
}
}
{code}

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6313) Some words was spelled wrong and incorrect LOG.error without print

2017-04-17 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6313:
---

 Summary: Some words was spelled wrong and incorrect LOG.error 
without print
 Key: FLINK-6313
 URL: https://issues.apache.org/jira/browse/FLINK-6313
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Reporter: mingleizhang
Assignee: mingleizhang
Priority: Trivial


I find some words are spelled wrong and log.error without print information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970588#comment-15970588
 ] 

mingleizhang commented on FLINK-6130:
-

[~tedyu] Thanks and appreciate it. I will give a PR to this jira soon enough.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang edited comment on FLINK-5611 at 4/17/17 6:35 AM:
--

[~uce]  Hi, How do you think of this ? It is under the below. 
Thanks.{code}org.apache.flink.runtime.query.QueryableStateException{code}. 

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.


was (Author: mingleizhang):
[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729
 ] 

mingleizhang edited comment on FLINK-5611 at 4/17/17 6:34 AM:
--

[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code} 
I really dont know what kind of stuff can cause a QueryableStateException. So, 
I couldnt give a specific information message here. Just "Queryable state 
exception"  instead now.


was (Author: mingleizhang):
[~uce]  Hi, How do you think of this ? It is under 
{code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks.

{code}
public class QueryableStateException extends Exception {

private static final long serialVersionUID = 1L;

public QueryableStateException() {
super("Queryable state exception.");
}
}
{code}

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6351:

Priority: Major  (was: Minor)

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6351:

Description: The currently YarnFlinkApplicationMasterRunner inherits from 
AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We could 
conbine two classes and then instantiate the services and runtime components in 
the constructor of YarnFlinkApplicationMasterRunner for geting rid of the lock 
in run method.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998
 ] 

mingleizhang edited comment on FLINK-6351 at 4/21/17 3:10 AM:
--

[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
If there is no more question, I will work on this soon. Thanks go out to you.


was (Author: mingleizhang):
[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
Thanks go out to you.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6351:
---

 Summary: Refactoring YarnFlinkApplicationMasterRunner by combining 
AbstractYarnFlinkApplicationMasterRunner in one class
 Key: FLINK-6351
 URL: https://issues.apache.org/jira/browse/FLINK-6351
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: mingleizhang
Assignee: mingleizhang
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998
 ] 

mingleizhang commented on FLINK-6351:
-

[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
Thanks go out to you.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state

2017-04-21 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6345:
---

Assignee: mingleizhang

> Migrate from Java serialization for ContinuousFileReaderOperator's state
> 
>
> Key: FLINK-6345
> URL: https://issues.apache.org/jira/browse/FLINK-6345
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for {{ContinuousFileReaderOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state

2017-04-21 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978559#comment-15978559
 ] 

mingleizhang commented on FLINK-6345:
-

 I will try to solve this issue. 

> Migrate from Java serialization for ContinuousFileReaderOperator's state
> 
>
> Key: FLINK-6345
> URL: https://issues.apache.org/jira/browse/FLINK-6345
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for {{ContinuousFileReaderOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6130:
---

Assignee: mingleizhang

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970332#comment-15970332
 ] 

mingleizhang commented on FLINK-6130:
-

[~tedyu] Thanks. I would ask how to use those return value that is stored from 
resouceManager#getTerminationFuture() or do nothing ? It seems there is a 
intersection with jira Flink-6275.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6104) Resource leak in ListViaRangeSpeedMiniBenchmark

2017-04-16 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6104:
---

Assignee: mingleizhang

> Resource leak in ListViaRangeSpeedMiniBenchmark
> ---
>
> Key: FLINK-6104
> URL: https://issues.apache.org/jira/browse/FLINK-6104
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> final WriteOptions write_options = new WriteOptions()
> .setSync(false)
> .setDisableWAL(true);
> {code}
> WriteOptions ultimately extends AbstractNativeReference where:
> {code}
> public abstract class AbstractNativeReference implements AutoCloseable {
> {code}
> WriteOptions instance should be closed.
> {code}
> final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
> {code}
> RocksDB should be closed as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6104) Resource leak in ListViaRangeSpeedMiniBenchmark

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970341#comment-15970341
 ] 

mingleizhang commented on FLINK-6104:
-

I will give a pr to this jira soon.

> Resource leak in ListViaRangeSpeedMiniBenchmark
> ---
>
> Key: FLINK-6104
> URL: https://issues.apache.org/jira/browse/FLINK-6104
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> final WriteOptions write_options = new WriteOptions()
> .setSync(false)
> .setDisableWAL(true);
> {code}
> WriteOptions ultimately extends AbstractNativeReference where:
> {code}
> public abstract class AbstractNativeReference implements AutoCloseable {
> {code}
> WriteOptions instance should be closed.
> {code}
> final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
> {code}
> RocksDB should be closed as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5611) Add QueryableStateException type

2017-04-16 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5611:
---

Assignee: mingleizhang

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5611) Add QueryableStateException type

2017-04-16 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970385#comment-15970385
 ] 

mingleizhang commented on FLINK-5611:
-

[~uce]Hi, I will work on this soon. Thanks ~

> Add QueryableStateException type
> 
>
> Key: FLINK-5611
> URL: https://issues.apache.org/jira/browse/FLINK-5611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: mingleizhang
>Priority: Minor
>
> We currently have some exceptions like {{UnknownJobManager}} and the like 
> that should be sub types of the to be introduced {{QueryableStateException}}. 
> Right now, they extend checked and unchecked Exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972736#comment-15972736
 ] 

mingleizhang commented on FLINK-6130:
-

[~till.rohrmann] Void ? I see. I would think {code}@GuardedBy("lock){code} 
might a wrong as well. Could we do a refine like the following ?

Change the code 
{code}
@GuardedBy("lock")
private ResourceManager resourceManager;
{code}

to 
{code}
private volatile ResourceManager resourceManager;
{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972736#comment-15972736
 ] 

mingleizhang edited comment on FLINK-6130 at 4/18/17 2:10 PM:
--

[~till.rohrmann]  Void ? I see. I would think {code}@GuardedBy("lock){code} 
might a wrong as well. Could we do a refine like the following ?

Change the code 
{code}
@GuardedBy("lock")
private ResourceManager resourceManager;
{code}

to 
{code}
private volatile ResourceManager resourceManager;
{code}
[~Zentol] FYI. Thanks and appreciate it.


was (Author: mingleizhang):
[~till.rohrmann] Void ? I see. I would think {code}@GuardedBy("lock){code} 
might a wrong as well. Could we do a refine like the following ?

Change the code 
{code}
@GuardedBy("lock")
private ResourceManager resourceManager;
{code}

to 
{code}
private volatile ResourceManager resourceManager;
{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972656#comment-15972656
 ] 

mingleizhang commented on FLINK-6130:
-

[~till.rohrmann] Thanks for review, and except for [~Zentol] said, another 
stuff for getting more information about return value from {code}LOG.info("YARN 
Application Master finished and the result is {}", result);{code}

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972690#comment-15972690
 ] 

mingleizhang commented on FLINK-6130:
-

[~Zentol] Doesn't it return the result of the status of YARN Application Master 
? Like failure or success ?

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976012#comment-15976012
 ] 

mingleizhang commented on FLINK-6264:
-

Hi, I would not think it is a bug when brokers failer and restarted. That is 
because {code}unassignedPartitions.size() > 0{code} and then you got this 
exception. [~tzulitai] Hi, How about your opinion about this issue ?

> Kafka consumer fails if can't find leader for partition
> ---
>
> Key: FLINK-6264
> URL: https://issues.apache.org/jira/browse/FLINK-6264
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error many times when brokers failed/were 
> restarted:
> java.lang.RuntimeException: Unable to find a leader for partitions: 
> [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, 
> KafkaPartitionHandle=[mytopic,10], offset=-1]
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 7:13 AM:
--

[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does really, what kinda messages should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}


was (Author: mingleizhang):
[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it is really, what kinda message should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6319) Add timeout when shutting SystemProcessingTimeService down

2017-04-22 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979854#comment-15979854
 ] 

mingleizhang commented on FLINK-6319:
-

+1
I have to say, the method of {code}shutdownNow{code}has it's limitation. 

> Add timeout when shutting SystemProcessingTimeService down
> --
>
> Key: FLINK-6319
> URL: https://issues.apache.org/jira/browse/FLINK-6319
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> A user noted that we simply call {{shutdownNow}} on the 
> {{SystemProcessingTimeService's}} {{ScheduledThreadpoolExecutor}} when 
> calling {{SystemProcessingTimeService.shutdownService}}. {{shutdowNow}} will 
> halt all waiting tasks but it won't wait until the currently running tasks 
> have been completed. This can lead to unwanted runtime behaviours such as 
> wrong termination orders when shutting down tasks (as reported in 
> https://issues.apache.org/jira/browse/FLINK-4973?focusedCommentId=15965884=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15965884).
> I propose to add a small timeout to wait for currently running tasks to 
> complete. Even though this problem cannot be completely solved since timer 
> tasks might take longer than the specified timeout, a timeout for waiting for 
> running tasks to complete will mitigate the problem.
> We can do this by calling {{timerServicer.awaitTermination(timeout, 
> timeoutUnit);}} after the {{shutdowNow}} call.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980624#comment-15980624
 ] 

mingleizhang commented on FLINK-5855:
-

[~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code}  prior 
to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a 
maybe that the it has alreay removed all elements from the map. Then trigger 
the {code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem 
should occur here ?  CC [~till.rohrmann]

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5855:
---

Assignee: mingleizhang

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980624#comment-15980624
 ] 

mingleizhang edited comment on FLINK-5855 at 4/24/17 1:50 AM:
--

[~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code}  prior 
to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a 
maybe that it has alreay removed all elements from the map. Then trigger the 
{code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem should 
occur here ?  CC [~till.rohrmann]


was (Author: mingleizhang):
[~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code}  prior 
to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a 
maybe that the it has alreay removed all elements from the map. Then trigger 
the {code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem 
should occur here ?  CC [~till.rohrmann]

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664
 ] 

mingleizhang edited comment on FLINK-5855 at 4/24/17 2:57 AM:
--

Yep. That is right. So, follow your suggestion. I would write the code snippet 
like Correct ?{code}. 

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code}


was (Author: mingleizhang):
@Ted Yu, Yep. That is right. So, follow your suggestion. I would write the code 
snippet like {code}. Correct ?

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code}

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664
 ] 

mingleizhang commented on FLINK-5855:
-

@Ted Yu, Yep. That is right. So, follow your suggestion. I would write the code 
snippet like {code}. Correct ?

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code}

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664
 ] 

mingleizhang edited comment on FLINK-5855 at 4/24/17 2:57 AM:
--

Yep. That is right. So, follow your suggestion. I would write the code snippet 
like Correct ?{code}
synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code}


was (Author: mingleizhang):
Yep. That is right. So, follow your suggestion. I would write the code snippet 
like Correct ?{code}. 

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code}

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-5855:

Comment: was deleted

(was: Yep. That is right. So, follow your suggestion. I would write the code 
snippet like Correct ?{code}
synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
}


handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

{code})

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980668#comment-15980668
 ] 

mingleizhang commented on FLINK-5855:
-

I think the code snippet would follow your suggestion. How do you think ?

{code}
synchronized (restoredState.pendingFilesPerCheckpoint) {

handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
restoredState.pendingFilesPerCheckpoint.clear();
}
{code}

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-23 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980671#comment-15980671
 ] 

mingleizhang commented on FLINK-5855:
-

Okay, thanks. I will give a PR to this. I dont know really, why so many issue 
like concurrent and lock in Flink if there is really a question.

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()

2017-04-17 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971966#comment-15971966
 ] 

mingleizhang commented on FLINK-5943:
-

[~tedyu] Thanks and appreciate it. I will give a PR soon enough.

> Unprotected access to haServices in 
> YarnFlinkApplicationMasterRunner#shutdown()
> ---
>
> Key: FLINK-5943
> URL: https://issues.apache.org/jira/browse/FLINK-5943
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   protected void shutdown(ApplicationStatus status, String msg) {
> // Need to clear the job state in the HA services before shutdown
> try {
>   haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
> }
> {code}
> The access to haServices is without lock protection.
> haServices may have been closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 7:14 AM:
--

[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does necessary, what kinda messages should we put 
it here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}


was (Author: mingleizhang):
[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does really, what kinda messages should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml dont contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it is really, what kinda message should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937
 ] 

mingleizhang edited comment on FLINK-6130 at 4/19/17 2:36 AM:
--

[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not get any useful message from it. Thanks and appreciate 
it.


was (Author: mingleizhang):
[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not any useful message from it. Thanks and appreciate it.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6311:
---

Assignee: mingleizhang

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974124#comment-15974124
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] Thanks for telling me so useful information. I am very appreciate 
it. Yep, I would like to work on this and been working on it soon enough.

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held

2017-04-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937
 ] 

mingleizhang commented on FLINK-6130:
-

[~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided 
that the previous practice {code}Object result = future.value().get();{code} is 
meaningless as I can not any useful message from it. Thanks and appreciate it.

> Consider calling resourceManager#getTerminationFuture() with lock held
> --
>
> Key: FLINK-6130
> URL: https://issues.apache.org/jira/browse/FLINK-6130
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> In YarnFlinkApplicationMasterRunner#runApplicationMaster() :
> {code}
>   synchronized (lock) {
> LOG.info("Starting High Availability Services");
> ...
>   }
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> resourceManager#getTerminationFuture() is called without holding lock.
> We should store the value returned from 
> resourceManager#getTerminationFuture() inside the synchronized block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974385#comment-15974385
 ] 

mingleizhang commented on FLINK-6311:
-

[~tzulitai] Thanks and I will give a PR to this jira soon enough. Very 
appreciate it.

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 10:02 AM:
---

[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml does not  contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 


was (Author: mingleizhang):
[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml dont contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-7302) Failed to run CorrelateITCase class under windows environment

2017-07-31 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-7302:

Description: 
Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> 
"#"))

Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(

Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> 
" "))

Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(


  was:
Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> 
"#"))



> Failed to run CorrelateITCase class under windows environment
> -
>
> Key: FLINK-7302
> URL: https://issues.apache.org/jira/browse/FLINK-7302
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
> Environment: Windows 7 
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" 
> -> "#"))
> Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(
> Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" 
> -> " "))
> Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7302) Failed to run CorrelateITCase class under windows environment

2017-07-31 Thread mingleizhang (JIRA)
mingleizhang created FLINK-7302:
---

 Summary: Failed to run CorrelateITCase class under windows 
environment
 Key: FLINK-7302
 URL: https://issues.apache.org/jira/browse/FLINK-7302
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
 Environment: Windows 7 
Reporter: mingleizhang
Assignee: mingleizhang


Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
it is imported twice in the same scope by
import org.apache.flink.table.utils._
and import org.apache.flink.table.runtime.utils._
UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> 
"#"))




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case

2017-07-31 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107062#comment-16107062
 ] 

mingleizhang commented on FLINK-7297:
-

Every time, I dont know why It would take me several hours to see Job log. And 
I can not see it as it takes me so long time. Always loading

> Instable Kafka09ProducerITCase.testCustomPartitioning test case
> ---
>
> Key: FLINK-7297
> URL: https://issues.apache.org/jira/browse/FLINK-7297
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> There seems to be a test instability of 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on 
> Travis.
> https://travis-ci.org/apache/flink/jobs/258538636



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case

2017-07-31 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107062#comment-16107062
 ] 

mingleizhang edited comment on FLINK-7297 at 7/31/17 9:56 AM:
--

Every time, I dont know why It would take me several hours to see Job log. And 
I can not see it as it takes me so long time. Always loadingNever 
successed to see log before.


was (Author: mingleizhang):
Every time, I dont know why It would take me several hours to see Job log. And 
I can not see it as it takes me so long time. Always loading

> Instable Kafka09ProducerITCase.testCustomPartitioning test case
> ---
>
> Key: FLINK-7297
> URL: https://issues.apache.org/jira/browse/FLINK-7297
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> There seems to be a test instability of 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on 
> Travis.
> https://travis-ci.org/apache/flink/jobs/258538636



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case

2017-07-31 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107059#comment-16107059
 ] 

mingleizhang commented on FLINK-7297:
-

It is just a XML instead.

> Instable Kafka09ProducerITCase.testCustomPartitioning test case
> ---
>
> Key: FLINK-7297
> URL: https://issues.apache.org/jira/browse/FLINK-7297
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> There seems to be a test instability of 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on 
> Travis.
> https://travis-ci.org/apache/flink/jobs/258538636



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7092) Shutdown ResourceManager components properly

2017-07-29 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106216#comment-16106216
 ] 

mingleizhang commented on FLINK-7092:
-

Thanks to Till. I back to my home these days. :) Shy I am.

> Shutdown ResourceManager components properly
> 
>
> Key: FLINK-7092
> URL: https://issues.apache.org/jira/browse/FLINK-7092
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, 
> {{LaunchCoordinator}}, {{ConnectionMonitor}} and a 
> {{ReconciliationCoordinator}}. These components have to be properly shut down 
> when the {{MesosResourceManager}} closes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7302) Failed to run CorrelateITCase class under windows environment

2017-08-03 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang resolved FLINK-7302.
-
Resolution: Won't Fix

> Failed to run CorrelateITCase class under windows environment
> -
>
> Key: FLINK-7302
> URL: https://issues.apache.org/jira/browse/FLINK-7302
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
> Environment: Windows 7 
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" 
> -> "#"))
> Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(
> Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" 
> -> " "))
> Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous;
> it is imported twice in the same scope by
> import org.apache.flink.table.utils._
> and import org.apache.flink.table.runtime.utils._
> UserDefinedFunctionTestUtils.setJobParameters(



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114213#comment-16114213
 ] 

mingleizhang commented on FLINK-6995:
-

Ahhha. I see. Thanks ~

> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114134#comment-16114134
 ] 

mingleizhang commented on FLINK-6995:
-

I am planning checkout a new branch based on {{release-1.2 branch}}. Then, 
working on that branch. If im wrong, please let me know. 

> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114184#comment-16114184
 ] 

mingleizhang commented on FLINK-6995:
-

Hi, [~alpinegizmo] What you said : We have to be enabled by setting 
{{site.is_latest}} to false. My question is, where to set it in our code ? I 
checked {{release-1.2 branch}}. The warning was already existing. And the 
https://flink.apache.org/q/stable-docs.html had already pointed to the 1.3. I 
probably think we have finished this work. But not sure. Could you take a look 
on it ?

> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7162) Tests should not write outside 'target' directory.

2017-07-12 Thread mingleizhang (JIRA)
mingleizhang created FLINK-7162:
---

 Summary: Tests should not write outside 'target' directory.
 Key: FLINK-7162
 URL: https://issues.apache.org/jira/browse/FLINK-7162
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Reporter: mingleizhang
Assignee: mingleizhang






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7162) Tests should not write outside 'target' directory.

2017-07-12 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-7162:

Issue Type: Improvement  (was: Bug)

> Tests should not write outside 'target' directory.
> --
>
> Key: FLINK-7162
> URL: https://issues.apache.org/jira/browse/FLINK-7162
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> A few tests use Files.createTempDir() from Guava package, but do not set 
> java.io.tmpdir system property. Thus the temp directory is created in 
> unpredictable places and is not being cleaned up by {{mvn clean}}.
> This was probably introduced in {{JobManagerStartupTest}} and then replicated 
> in {{BlobUtilsTest}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7162) Tests should not write outside 'target' directory.

2017-07-12 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-7162:

Description: 
A few tests use Files.createTempDir() from Guava package, but do not set 
java.io.tmpdir system property. Thus the temp directory is created in 
unpredictable places and is not being cleaned up by {{mvn clean}}.

This was probably introduced in {{JobManagerStartupTest}} and then replicated 
in {{BlobUtilsTest}}.

> Tests should not write outside 'target' directory.
> --
>
> Key: FLINK-7162
> URL: https://issues.apache.org/jira/browse/FLINK-7162
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> A few tests use Files.createTempDir() from Guava package, but do not set 
> java.io.tmpdir system property. Thus the temp directory is created in 
> unpredictable places and is not being cleaned up by {{mvn clean}}.
> This was probably introduced in {{JobManagerStartupTest}} and then replicated 
> in {{BlobUtilsTest}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6934) Consider moving LRUCache and it's test classes

2017-07-12 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6934:

Summary: Consider moving LRUCache and it's test classes  (was: Consider 
moving LRUCache class)

> Consider moving LRUCache and it's test classes
> --
>
> Key: FLINK-6934
> URL: https://issues.apache.org/jira/browse/FLINK-6934
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> LRUCache class is not used any more. So, I would suggest remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6378) Implement FLIP-6 Flink-on-Mesos

2017-07-10 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081702#comment-16081702
 ] 

mingleizhang commented on FLINK-6378:
-

Why are we going to remove akka dependency ?

> Implement FLIP-6 Flink-on-Mesos
> ---
>
> Key: FLINK-6378
> URL: https://issues.apache.org/jira/browse/FLINK-6378
> Project: Flink
>  Issue Type: New Feature
>  Components: Mesos
>Reporter: Eron Wright 
>  Labels: flip-6
>
> This is the parent issue for implementing Flink on Mesos using the new FLIP-6 
> architecture.
> This covers individual jobs running as Mesos frameworks, where the framework 
> and job lifetime are coupled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7139) Refine currentTimeMillis to nanoTime

2017-07-08 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079084#comment-16079084
 ] 

mingleizhang commented on FLINK-7139:
-

Hello, [~Zentol] What do you think ?

> Refine currentTimeMillis to nanoTime
> 
>
> Key: FLINK-7139
> URL: https://issues.apache.org/jira/browse/FLINK-7139
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Minor
>
> When measuring intervals of time it's much better & safer to use 
> {{System.nanoTime()}} than {{System.currentTimeMillis()}}. The latter is 
> affected by system clock drift and adjustments (like by system time daemon) 
> and the former is much more precise and stable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7139) Refine currentTimeMillis to nanoTime

2017-07-08 Thread mingleizhang (JIRA)
mingleizhang created FLINK-7139:
---

 Summary: Refine currentTimeMillis to nanoTime
 Key: FLINK-7139
 URL: https://issues.apache.org/jira/browse/FLINK-7139
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Reporter: mingleizhang
Assignee: mingleizhang
Priority: Minor


When measuring intervals of time it's much better & safer to use 
{{System.nanoTime()}} than {{System.currentTimeMillis()}}. The latter is 
affected by system clock drift and adjustments (like by system time daemon) and 
the former is much more precise and stable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils

2017-07-07 Thread mingleizhang (JIRA)
mingleizhang created FLINK-7134:
---

 Summary: Remove hadoop1.x code in mapreduce.utils.HadoopUtils
 Key: FLINK-7134
 URL: https://issues.apache.org/jira/browse/FLINK-7134
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: mingleizhang
Assignee: mingleizhang


This jira is similar to FLINK-7118. And for a clearer format and a review, I 
separated the two jira.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7121) Remove the hardcoded way in core

2017-07-07 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078189#comment-16078189
 ] 

mingleizhang commented on FLINK-7121:
-

OK.

> Remove the hardcoded way in core
> 
>
> Key: FLINK-7121
> URL: https://issues.apache.org/jira/browse/FLINK-7121
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: mingleizhang
>Assignee: mingleizhang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085030#comment-16085030
 ] 

mingleizhang commented on FLINK-6493:
-

Thanks Ted. {{partitionStateSerializerConfigSnapshot.equals()}} also may lead 
to NPE I guess.

> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085054#comment-16085054
 ] 

mingleizhang edited comment on FLINK-6493 at 7/13/17 2:29 AM:
--

Actually, Sorry. I dont think null check is ineffective, as it just combination 
with another clause which belongs to not empty constructor has 
{{Preconditions.checkNotNull}}. In empty constructor, it does not have a null 
check. Even though,it would be still throw an NPE when call 
{{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null 
and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. 
So, I believe in this case, we should return a false instead of throw an NPE.


was (Author: mingleizhang):
Actually, Sorry. I dont think null check is ineffective, as it just combination 
with another clause which belongs to not empty constructor has 
{{Preconditions.checkNotNull}}. In empty constructor, it does not have a null 
check. Even though,it would be still throw an NPE when call 
{{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null 
and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. 
So, I believe in this case, we should return a false instead throw an NPE.

> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-12 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6493:
---

Assignee: mingleizhang

> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-07-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085159#comment-16085159
 ] 

mingleizhang commented on FLINK-5541:
-

Thanks Ted report this, I wil take a look.

> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085054#comment-16085054
 ] 

mingleizhang commented on FLINK-6493:
-

Actually, Sorry. I dont think null check is ineffective, as it just combination 
with another clause which belongs to not empty constructor has 
{{Preconditions.checkNotNull}}. In empty constructor, it does not have a null 
check. Even though,it would be still throw an NPE when call 
{{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null 
and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. 
So, I believe in this case, we should return a false instead throw an NPE.

> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-07-12 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6105:
---

Assignee: mingleizhang

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-07-13 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5541:
---

Assignee: mingleizhang

> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5987) Upgrade zookeeper dependency to 3.4.10

2017-07-13 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5987:
---

Assignee: mingleizhang

> Upgrade zookeeper dependency to 3.4.10
> --
>
> Key: FLINK-5987
> URL: https://issues.apache.org/jira/browse/FLINK-5987
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> zookeeper 3.4.8 has been released.
> Among the fixes the following are desirable:
> ZOOKEEPER-706 large numbers of watches can cause session re-establishment to 
> fail 
> ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll
> This issue upgrades zookeeper dependency to 3.4.8



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-07-18 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5486:
---

Assignee: mingleizhang

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7118) Remove hadoop1.x code in HadoopUtils

2017-07-07 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-7118:

Description: 
Since flink no longer support hadoop 1.x version, we should remove it. Below 
code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}}

{code:java}
public static JobContext instantiateJobContext(Configuration configuration, 
JobID jobId) throws Exception {
try {
Class clazz;
// for Hadoop 1.xx
if(JobContext.class.isInterface()) {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
}
// for Hadoop 2.xx
else {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
Thread.currentThread().getContextClassLoader());
}
Constructor constructor = 
clazz.getConstructor(Configuration.class, JobID.class);
JobContext context = (JobContext) 
constructor.newInstance(configuration, jobId);

return context;
} catch(Exception e) {
throw new Exception("Could not create instance of 
JobContext.");
}
}
{code}

And 


{code:java}
public static TaskAttemptContext 
instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
taskAttemptID) throws Exception {
try {
Class clazz;
// for Hadoop 1.xx
if(JobContext.class.isInterface()) {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
}
// for Hadoop 2.xx
else {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
}
Constructor constructor = 
clazz.getConstructor(Configuration.class, TaskAttemptID.class);
TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(configuration, taskAttemptID);

return context;
} catch(Exception e) {
throw new Exception("Could not create instance of 
TaskAttemptContext.");
}
}
{code}

  was:
Since flink no longer support hadoop 1.x version, we should remove it. 


{code:java}
public static JobContext instantiateJobContext(Configuration configuration, 
JobID jobId) throws Exception {
try {
Class clazz;
// for Hadoop 1.xx
if(JobContext.class.isInterface()) {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
Thread.currentThread().getContextClassLoader());
}
// for Hadoop 2.xx
else {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
Thread.currentThread().getContextClassLoader());
}
Constructor constructor = 
clazz.getConstructor(Configuration.class, JobID.class);
JobContext context = (JobContext) 
constructor.newInstance(configuration, jobId);

return context;
} catch(Exception e) {
throw new Exception("Could not create instance of 
JobContext.");
}
}
{code}

And 


{code:java}
public static TaskAttemptContext 
instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
taskAttemptID) throws Exception {
try {
Class clazz;
// for Hadoop 1.xx
if(JobContext.class.isInterface()) {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
}
// for Hadoop 2.xx
else {
clazz = 
Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
}
Constructor constructor = 
clazz.getConstructor(Configuration.class, TaskAttemptID.class);
TaskAttemptContext context = (TaskAttemptContext) 
constructor.newInstance(configuration, taskAttemptID);

return context;
} catch(Exception e) {
  

[jira] [Commented] (FLINK-7114) Remove the hardcoded way like classpath is written by a string

2017-07-07 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078199#comment-16078199
 ] 

mingleizhang commented on FLINK-7114:
-

Thanks. Chesnay. Makes sense better.

> Remove the hardcoded way like classpath is written by a string
> --
>
> Key: FLINK-7114
> URL: https://issues.apache.org/jira/browse/FLINK-7114
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> For example, Like in {{HadoopFileSystem}}  class, there is a field which is 
> written like {{private static final String DEFAULT_HDFS_CLASS = 
> "org.apache.hadoop.hdfs.DistributedFileSystem";}}. This way of writing has a 
> bad stuff, once the classpath changes, it is difficult to find the error. I 
> suggest we should change it to a more flexable way.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils

2017-07-18 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092465#comment-16092465
 ] 

mingleizhang commented on FLINK-7134:
-

I will work on this soon enough.

> Remove hadoop1.x code in mapreduce.utils.HadoopUtils
> 
>
> Key: FLINK-7134
> URL: https://issues.apache.org/jira/browse/FLINK-7134
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> This jira is similar to FLINK-7118. And for a clearer format and a review, I 
> separated the two jira.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7225) Cutoff exception message in StateDescriptor

2017-07-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094600#comment-16094600
 ] 

mingleizhang commented on FLINK-7225:
-

[~StephanEwen] Sorry, I dont understand what you exactly mean. Do you agree 
what I mentioned above {{please make the type to be parameterized by using 
generics}} is correct ? But you mean I need construct another 
{{StateDescriptor}} construction method is like following with adding an 
implemention by {{TypeHint}} as a parameter ?
{code:java}
protected StateDescriptor(String name, TypeHint type, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
requireNonNull(type, "type class must not be null");
try {
this.typeInfo = type.getTypeInfo();
{code}

> Cutoff exception message in StateDescriptor
> ---
>
> Key: FLINK-7225
> URL: https://issues.apache.org/jira/browse/FLINK-7225
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
> Fix For: 1.4.0, 1.3.2
>
>
> When the type extraction fails in the StateDescriptor constructor an 
> exception is thrown, but the message is cutoff and doesn't contain any advice 
> to remedy the situation.
> {code}
> try {
>   this.typeInfo = TypeExtractor.createTypeInfo(type);
>   } catch (Exception e) {
>   throw new RuntimeException("Cannot create full type 
> information based on the given class. If the type has generics, please", e);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4421) Make clocks and time measurements monotonous

2017-07-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095699#comment-16095699
 ] 

mingleizhang commented on FLINK-4421:
-

Hi, Stephan. Thanks for reporting this. I would like to know, Under what 
circumstances will lead to clock updates ? that is what you said can lead to 
negative duration. And I would like to try read {{Introduce a Clock utility for 
monotonous system timestamps}}.

Like today I run a code like {{System.currentTimeMillis()}} return 12345. 
Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to 
negative duration.

> Make clocks and time measurements monotonous
> 
>
> Key: FLINK-4421
> URL: https://issues.apache.org/jira/browse/FLINK-4421
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Priority: Minor
>
> Currently, many places use {{System.currentTimeMillis()}} to acquire 
> timestamps or measure time intervals.
> Since this relies on the system clock, and the system clock is not 
> necessarily monotonous (in the presence of clock updates), this can lead to 
> negative duration and decreasing timestamps where increasing timestamps are 
> expected.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-4421) Make clocks and time measurements monotonous

2017-07-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095699#comment-16095699
 ] 

mingleizhang edited comment on FLINK-4421 at 7/21/17 2:46 AM:
--

Hi, [~StephanEwen]. Thanks for reporting this. I would like to know, Under what 
circumstances will lead to clock updates ? that is what you said can lead to 
negative duration. And I would like to try read {{Introduce a Clock utility for 
monotonous system timestamps}}.

Like today I run a code like {{System.currentTimeMillis()}} return 12345. 
Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to 
negative duration.


was (Author: mingleizhang):
Hi, Stephan. Thanks for reporting this. I would like to know, Under what 
circumstances will lead to clock updates ? that is what you said can lead to 
negative duration. And I would like to try read {{Introduce a Clock utility for 
monotonous system timestamps}}.

Like today I run a code like {{System.currentTimeMillis()}} return 12345. 
Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to 
negative duration.

> Make clocks and time measurements monotonous
> 
>
> Key: FLINK-4421
> URL: https://issues.apache.org/jira/browse/FLINK-4421
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Priority: Minor
>
> Currently, many places use {{System.currentTimeMillis()}} to acquire 
> timestamps or measure time intervals.
> Since this relies on the system clock, and the system clock is not 
> necessarily monotonous (in the presence of clock updates), this can lead to 
> negative duration and decreasing timestamps where increasing timestamps are 
> expected.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLINK-4421) Make clocks and time measurements monotonous

2017-07-20 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-4421:

Comment: was deleted

(was: Hi, [~StephanEwen]. Thanks for reporting this. I would like to know, 
Under what circumstances will lead to clock updates ? that is what you said can 
lead to negative duration. And I would like to try read {{Introduce a Clock 
utility for monotonous system timestamps}}.

Like today I run a code like {{System.currentTimeMillis()}} return 12345. 
Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to 
negative duration.)

> Make clocks and time measurements monotonous
> 
>
> Key: FLINK-4421
> URL: https://issues.apache.org/jira/browse/FLINK-4421
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Priority: Minor
>
> Currently, many places use {{System.currentTimeMillis()}} to acquire 
> timestamps or measure time intervals.
> Since this relies on the system clock, and the system clock is not 
> necessarily monotonous (in the presence of clock updates), this can lead to 
> negative duration and decreasing timestamps where increasing timestamps are 
> expected.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4423) Introduce a Clock utility for monotonous system timestamps

2017-07-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095743#comment-16095743
 ] 

mingleizhang commented on FLINK-4423:
-

[~StephanEwen] This is a very interesting topic indeed . One question I would 
like to ask and confirm, We add Clock utility is that because we want to repair 
or fixing the existence of the {{System.currentTimeMillis()}} which is not 
monotonous issue ? With remembers the max returned timestamp and that will let 
the new function has the same functionality as {{System.nanoTime()}}. I 
understand correct ?

> Introduce a Clock utility for monotonous system timestamps
> --
>
> Key: FLINK-4423
> URL: https://issues.apache.org/jira/browse/FLINK-4423
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>
> I suggest to introduce a {{Clock}} class that provides a 
> {{currentTimeMillis()}} function that calls {{System.currentTimeMillis()}} 
> but also remembers the max returned timestamp so far. That way it would never 
> return decreasing timestamps.
> In the presence of clock backwards adjustments, the appearance would be that 
> time stands still for a while, until the clock has caught up with the 
> previous timestamp.
> Since we don't rely on this for measuring timeouts, but only for logging / 
> visualization / etc (see [FLINK-4422])  it should not mess up any distributed 
> system behavior.
> We would use this in places like the {{ExecutionGraph}}, where we record 
> timestamps for state transitions. That way, the utilities that derive charts 
> and times from the status timestamps would not be thrown off if timestamps 
> were decreasing when expected increasing.
> The same holds for ingestion time timestamps and for processing time triggers.
> NOTE: I would like some other opinions on that - it is a somewhat delicate 
> matter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5130) Remove Deprecated Methods from WindowedStream

2017-07-21 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096110#comment-16096110
 ] 

mingleizhang commented on FLINK-5130:
-

If agree, I wont do it until 2.0.0 begin.

> Remove Deprecated Methods from WindowedStream
> -
>
> Key: FLINK-5130
> URL: https://issues.apache.org/jira/browse/FLINK-5130
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3992) Remove Key interface

2017-07-21 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096123#comment-16096123
 ] 

mingleizhang commented on FLINK-3992:
-

Could you please add more message here ? [~Zentol] Thanks. :)

> Remove Key interface
> 
>
> Key: FLINK-3992
> URL: https://issues.apache.org/jira/browse/FLINK-3992
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment

2017-07-21 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096120#comment-16096120
 ] 

mingleizhang commented on FLINK-4048:
-

I would like work on it when  2.0.0 should comes. Now, I wont.

> Remove Hadoop dependencies from ExecutionEnvironment
> 
>
> Key: FLINK-4048
> URL: https://issues.apache.org/jira/browse/FLINK-4048
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Reporter: Robert Metzger
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2017-07-25 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-5789:

Summary: Make Bucketing Sink independent of Hadoop's FileSystem  (was: Make 
Bucketing Sink independent of Hadoop's FileSysten)

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Stephan Ewen
> Fix For: 1.4.0
>
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   5   6   >