[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()
[ https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964530#comment-15964530 ] mingleizhang commented on FLINK-6143: - [~Zentol] Thanks for your help and the pr is ready now. Could you please help me review ? Many thanks go out to you. > Unprotected access to this.flink in LocalExecutor#endSession() > -- > > Key: FLINK-6143 > URL: https://issues.apache.org/jira/browse/FLINK-6143 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > public void endSession(JobID jobID) throws Exception { > LocalFlinkMiniCluster flink = this.flink; > if (flink != null) { > {code} > The flink field is not declared volatile and access to this.flink doesn't > hold the LocalExecutor.lock -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6275: --- Assignee: mingleizhang > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873 ] mingleizhang edited comment on FLINK-6275 at 4/11/17 7:03 AM: -- Let me inside it what happened and I will give a patch to this jira soon. was (Author: mingleizhang): Let me inside it what happened. > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873 ] mingleizhang commented on FLINK-6275: - Let me inside it what happened. > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963873#comment-15963873 ] mingleizhang edited comment on FLINK-6275 at 4/11/17 9:43 AM: -- [~tedyu] Could you please assgin this jira to me ? Let me inside it what happened and I will give a patch to this jira soon. was (Author: mingleizhang): Let me inside it what happened and I will give a patch to this jira soon. > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6245) Fix late side output documentation in Window documents.
[ https://issues.apache.org/jira/browse/FLINK-6245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964067#comment-15964067 ] mingleizhang commented on FLINK-6245: - [~kkl0u] Could you please specify information of where the codes in Flink project like packageName.ClassName ? > Fix late side output documentation in Window documents. > --- > > Key: FLINK-6245 > URL: https://issues.apache.org/jira/browse/FLINK-6245 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Priority: Minor > Fix For: 1.3.0 > > > There are two things that need to be done: > 1) in the syntax description in the beginning of the page, we should also > include the {{getSideOutput()}} > 2) in the "Getting late data as a side output" section and for the Java > example, it should not be a {{DataStream result ...}} but a > {{SingleOutputStreamOperator}}, if we want to get the late event side output. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()
[ https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153 ] mingleizhang edited comment on FLINK-6143 at 4/11/17 10:37 AM: --- [~tedyu] Hi, Could you please assign this jira to me ? I will work on this soon. Thanks. was (Author: mingleizhang): [~tedyu] Hi, Could you please assgin this jira to me ? I will work on this soon. Thanks. > Unprotected access to this.flink in LocalExecutor#endSession() > -- > > Key: FLINK-6143 > URL: https://issues.apache.org/jira/browse/FLINK-6143 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Ted Yu >Priority: Minor > > {code} > public void endSession(JobID jobID) throws Exception { > LocalFlinkMiniCluster flink = this.flink; > if (flink != null) { > {code} > The flink field is not declared volatile and access to this.flink doesn't > hold the LocalExecutor.lock -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()
[ https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964153#comment-15964153 ] mingleizhang commented on FLINK-6143: - [~tedyu] Hi, Could you please assgin this jira to me ? I will work on this soon. Thanks. > Unprotected access to this.flink in LocalExecutor#endSession() > -- > > Key: FLINK-6143 > URL: https://issues.apache.org/jira/browse/FLINK-6143 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Ted Yu >Priority: Minor > > {code} > public void endSession(JobID jobID) throws Exception { > LocalFlinkMiniCluster flink = this.flink; > if (flink != null) { > {code} > The flink field is not declared volatile and access to this.flink doesn't > hold the LocalExecutor.lock -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6143) Unprotected access to this.flink in LocalExecutor#endSession()
[ https://issues.apache.org/jira/browse/FLINK-6143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966974#comment-15966974 ] mingleizhang commented on FLINK-6143: - [~StephanEwen] , Hi, Could you please take some time review this patch ? Many thanks go out to you. > Unprotected access to this.flink in LocalExecutor#endSession() > -- > > Key: FLINK-6143 > URL: https://issues.apache.org/jira/browse/FLINK-6143 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > public void endSession(JobID jobID) throws Exception { > LocalFlinkMiniCluster flink = this.flink; > if (flink != null) { > {code} > The flink field is not declared volatile and access to this.flink doesn't > hold the LocalExecutor.lock -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966977#comment-15966977 ] mingleizhang commented on FLINK-6275: - [~StephanEwen] Hi, Could you please take some time review this patch and give some advice on this ? Thanks. > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970586#comment-15970586 ] mingleizhang edited comment on FLINK-6130 at 4/17/17 1:51 AM: -- [~tedyu] Could you please help review the code ? How do you think of this ? Thanks. {code} Future future; synchronized (lock) { // wait for resource manager to finish future = (Future) resourceManager.getTerminationFuture(); } Object object = future.get(); // everything started, we can wait until all is done or the process is killed LOG.info("YARN Application Master finished" + object.toString()); {code} was (Author: mingleizhang): [~tedyu] Could you please help review the code ? How do you think of this ? Thanks. {code} Future future; synchronized (lock) { // wait for resource manager to finish future = (Future) resourceManager.getTerminationFuture(); } Object object = future.get(); // everything started, we can wait until all is done or the process is killed LOG.info("YARN Application Master finished" + object.toString()); {code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970586#comment-15970586 ] mingleizhang commented on FLINK-6130: - [~tedyu] Could you please help review the code ? How do you think of this ? Thanks. {code} Future future; synchronized (lock) { // wait for resource manager to finish future = (Future) resourceManager.getTerminationFuture(); } Object object = future.get(); // everything started, we can wait until all is done or the process is killed LOG.info("YARN Application Master finished" + object.toString()); {code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970598#comment-15970598 ] mingleizhang commented on FLINK-6130: - [~tedyu] Yep. And the code like below, how do you think of this ? Thanks. {code} @Override protected int runApplicationMaster(Configuration config) { Future future; synchronized (lock) { LOG.info("Starting High Availability Services"); // wait for resource manager to finish future = (Future) resourceManager.getTerminationFuture(); // (5) start the web monitor // TODO: add web monitor } Object object = future.value().get(); // everything started, we can wait until all is done or the process is killed LOG.info("YARN Application Master finished" + object.toString()); {code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970602#comment-15970602 ] mingleizhang commented on FLINK-6130: - [~tedyu] Thanks and appreciate it again. If there is no more question, I will give a PR soon. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5611) Add QueryableStateException type
[ https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729 ] mingleizhang commented on FLINK-5611: - [~uce] Hi, How do you think of this ? It is under {code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks. {code} public class QueryableStateException extends Exception { private static final long serialVersionUID = 1L; public QueryableStateException() { super("Queryable state exception."); } } {code} > Add QueryableStateException type > > > Key: FLINK-5611 > URL: https://issues.apache.org/jira/browse/FLINK-5611 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: mingleizhang >Priority: Minor > > We currently have some exceptions like {{UnknownJobManager}} and the like > that should be sub types of the to be introduced {{QueryableStateException}}. > Right now, they extend checked and unchecked Exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()
[ https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5943: --- Assignee: mingleizhang > Unprotected access to haServices in > YarnFlinkApplicationMasterRunner#shutdown() > --- > > Key: FLINK-5943 > URL: https://issues.apache.org/jira/browse/FLINK-5943 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > protected void shutdown(ApplicationStatus status, String msg) { > // Need to clear the job state in the HA services before shutdown > try { > haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); > } > {code} > The access to haServices is without lock protection. > haServices may have been closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()
[ https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970759#comment-15970759 ] mingleizhang commented on FLINK-5943: - [~tedyu] Hi, Ted, How do you think of this ? I have put what access to haSerivices inside of the synchronized block.Thanks. {code} protected void shutdown(ApplicationStatus status, String msg) { synchronized (lock) { // Need to clear the job state in the HA services before shutdown try { haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); } catch (Throwable t) { LOG.warn("Could not clear the job at the high-availability services", t); } if (jobManagerRunner != null) { try { jobManagerRunner.shutdown(); } catch (Throwable tt) { LOG.warn("Failed to stop the JobManagerRunner", tt); } } {code} > Unprotected access to haServices in > YarnFlinkApplicationMasterRunner#shutdown() > --- > > Key: FLINK-5943 > URL: https://issues.apache.org/jira/browse/FLINK-5943 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > {code} > protected void shutdown(ApplicationStatus status, String msg) { > // Need to clear the job state in the HA services before shutdown > try { > haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); > } > {code} > The access to haServices is without lock protection. > haServices may have been closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6313) Some words was spelled wrong and incorrect LOG.error without print
mingleizhang created FLINK-6313: --- Summary: Some words was spelled wrong and incorrect LOG.error without print Key: FLINK-6313 URL: https://issues.apache.org/jira/browse/FLINK-6313 Project: Flink Issue Type: Bug Components: Queryable State Reporter: mingleizhang Assignee: mingleizhang Priority: Trivial I find some words are spelled wrong and log.error without print information. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970588#comment-15970588 ] mingleizhang commented on FLINK-6130: - [~tedyu] Thanks and appreciate it. I will give a PR to this jira soon enough. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type
[ https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729 ] mingleizhang edited comment on FLINK-5611 at 4/17/17 6:35 AM: -- [~uce] Hi, How do you think of this ? It is under the below. Thanks.{code}org.apache.flink.runtime.query.QueryableStateException{code}. {code} public class QueryableStateException extends Exception { private static final long serialVersionUID = 1L; public QueryableStateException() { super("Queryable state exception."); } } {code} I really dont know what kind of stuff can cause a QueryableStateException. So, I couldnt give a specific information message here. Just "Queryable state exception" instead now. was (Author: mingleizhang): [~uce] Hi, How do you think of this ? It is under {code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks. {code} public class QueryableStateException extends Exception { private static final long serialVersionUID = 1L; public QueryableStateException() { super("Queryable state exception."); } } {code} I really dont know what kind of stuff can cause a QueryableStateException. So, I couldnt give a specific information message here. Just "Queryable state exception" instead now. > Add QueryableStateException type > > > Key: FLINK-5611 > URL: https://issues.apache.org/jira/browse/FLINK-5611 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: mingleizhang >Priority: Minor > > We currently have some exceptions like {{UnknownJobManager}} and the like > that should be sub types of the to be introduced {{QueryableStateException}}. > Right now, they extend checked and unchecked Exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5611) Add QueryableStateException type
[ https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970729#comment-15970729 ] mingleizhang edited comment on FLINK-5611 at 4/17/17 6:34 AM: -- [~uce] Hi, How do you think of this ? It is under {code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks. {code} public class QueryableStateException extends Exception { private static final long serialVersionUID = 1L; public QueryableStateException() { super("Queryable state exception."); } } {code} I really dont know what kind of stuff can cause a QueryableStateException. So, I couldnt give a specific information message here. Just "Queryable state exception" instead now. was (Author: mingleizhang): [~uce] Hi, How do you think of this ? It is under {code}org.apache.flink.runtime.query.QueryableStateException{code}. Thanks. {code} public class QueryableStateException extends Exception { private static final long serialVersionUID = 1L; public QueryableStateException() { super("Queryable state exception."); } } {code} > Add QueryableStateException type > > > Key: FLINK-5611 > URL: https://issues.apache.org/jira/browse/FLINK-5611 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: mingleizhang >Priority: Minor > > We currently have some exceptions like {{UnknownJobManager}} and the like > that should be sub types of the to be introduced {{QueryableStateException}}. > Right now, they extend checked and unchecked Exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6351: Priority: Major (was: Minor) > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6351: Description: The currently YarnFlinkApplicationMasterRunner inherits from AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We could conbine two classes and then instantiate the services and runtime components in the constructor of YarnFlinkApplicationMasterRunner for geting rid of the lock in run method. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998 ] mingleizhang edited comment on FLINK-6351 at 4/21/17 3:10 AM: -- [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? If there is no more question, I will work on this soon. Thanks go out to you. was (Author: mingleizhang): [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? Thanks go out to you. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
mingleizhang created FLINK-6351: --- Summary: Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class Key: FLINK-6351 URL: https://issues.apache.org/jira/browse/FLINK-6351 Project: Flink Issue Type: Improvement Components: YARN Reporter: mingleizhang Assignee: mingleizhang Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998 ] mingleizhang commented on FLINK-6351: - [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? Thanks go out to you. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state
[ https://issues.apache.org/jira/browse/FLINK-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6345: --- Assignee: mingleizhang > Migrate from Java serialization for ContinuousFileReaderOperator's state > > > Key: FLINK-6345 > URL: https://issues.apache.org/jira/browse/FLINK-6345 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for {{ContinuousFileReaderOperator}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state
[ https://issues.apache.org/jira/browse/FLINK-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978559#comment-15978559 ] mingleizhang commented on FLINK-6345: - I will try to solve this issue. > Migrate from Java serialization for ContinuousFileReaderOperator's state > > > Key: FLINK-6345 > URL: https://issues.apache.org/jira/browse/FLINK-6345 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for {{ContinuousFileReaderOperator}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6130: --- Assignee: mingleizhang > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970332#comment-15970332 ] mingleizhang commented on FLINK-6130: - [~tedyu] Thanks. I would ask how to use those return value that is stored from resouceManager#getTerminationFuture() or do nothing ? It seems there is a intersection with jira Flink-6275. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6104) Resource leak in ListViaRangeSpeedMiniBenchmark
[ https://issues.apache.org/jira/browse/FLINK-6104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6104: --- Assignee: mingleizhang > Resource leak in ListViaRangeSpeedMiniBenchmark > --- > > Key: FLINK-6104 > URL: https://issues.apache.org/jira/browse/FLINK-6104 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > final WriteOptions write_options = new WriteOptions() > .setSync(false) > .setDisableWAL(true); > {code} > WriteOptions ultimately extends AbstractNativeReference where: > {code} > public abstract class AbstractNativeReference implements AutoCloseable { > {code} > WriteOptions instance should be closed. > {code} > final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); > {code} > RocksDB should be closed as well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6104) Resource leak in ListViaRangeSpeedMiniBenchmark
[ https://issues.apache.org/jira/browse/FLINK-6104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970341#comment-15970341 ] mingleizhang commented on FLINK-6104: - I will give a pr to this jira soon. > Resource leak in ListViaRangeSpeedMiniBenchmark > --- > > Key: FLINK-6104 > URL: https://issues.apache.org/jira/browse/FLINK-6104 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > final WriteOptions write_options = new WriteOptions() > .setSync(false) > .setDisableWAL(true); > {code} > WriteOptions ultimately extends AbstractNativeReference where: > {code} > public abstract class AbstractNativeReference implements AutoCloseable { > {code} > WriteOptions instance should be closed. > {code} > final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); > {code} > RocksDB should be closed as well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5611) Add QueryableStateException type
[ https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5611: --- Assignee: mingleizhang > Add QueryableStateException type > > > Key: FLINK-5611 > URL: https://issues.apache.org/jira/browse/FLINK-5611 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: mingleizhang >Priority: Minor > > We currently have some exceptions like {{UnknownJobManager}} and the like > that should be sub types of the to be introduced {{QueryableStateException}}. > Right now, they extend checked and unchecked Exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5611) Add QueryableStateException type
[ https://issues.apache.org/jira/browse/FLINK-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970385#comment-15970385 ] mingleizhang commented on FLINK-5611: - [~uce]Hi, I will work on this soon. Thanks ~ > Add QueryableStateException type > > > Key: FLINK-5611 > URL: https://issues.apache.org/jira/browse/FLINK-5611 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: mingleizhang >Priority: Minor > > We currently have some exceptions like {{UnknownJobManager}} and the like > that should be sub types of the to be introduced {{QueryableStateException}}. > Right now, they extend checked and unchecked Exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972736#comment-15972736 ] mingleizhang commented on FLINK-6130: - [~till.rohrmann] Void ? I see. I would think {code}@GuardedBy("lock){code} might a wrong as well. Could we do a refine like the following ? Change the code {code} @GuardedBy("lock") private ResourceManager resourceManager; {code} to {code} private volatile ResourceManager resourceManager; {code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972736#comment-15972736 ] mingleizhang edited comment on FLINK-6130 at 4/18/17 2:10 PM: -- [~till.rohrmann] Void ? I see. I would think {code}@GuardedBy("lock){code} might a wrong as well. Could we do a refine like the following ? Change the code {code} @GuardedBy("lock") private ResourceManager resourceManager; {code} to {code} private volatile ResourceManager resourceManager; {code} [~Zentol] FYI. Thanks and appreciate it. was (Author: mingleizhang): [~till.rohrmann] Void ? I see. I would think {code}@GuardedBy("lock){code} might a wrong as well. Could we do a refine like the following ? Change the code {code} @GuardedBy("lock") private ResourceManager resourceManager; {code} to {code} private volatile ResourceManager resourceManager; {code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972656#comment-15972656 ] mingleizhang commented on FLINK-6130: - [~till.rohrmann] Thanks for review, and except for [~Zentol] said, another stuff for getting more information about return value from {code}LOG.info("YARN Application Master finished and the result is {}", result);{code} > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972690#comment-15972690 ] mingleizhang commented on FLINK-6130: - [~Zentol] Doesn't it return the result of the status of YARN Application Master ? Like failure or success ? > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976012#comment-15976012 ] mingleizhang commented on FLINK-6264: - Hi, I would not think it is a bug when brokers failer and restarted. That is because {code}unassignedPartitions.size() > 0{code} and then you got this exception. [~tzulitai] Hi, How about your opinion about this issue ? > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 7:13 AM: -- [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does really, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} was (Author: mingleizhang): [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it is really, what kinda message should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6319) Add timeout when shutting SystemProcessingTimeService down
[ https://issues.apache.org/jira/browse/FLINK-6319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979854#comment-15979854 ] mingleizhang commented on FLINK-6319: - +1 I have to say, the method of {code}shutdownNow{code}has it's limitation. > Add timeout when shutting SystemProcessingTimeService down > -- > > Key: FLINK-6319 > URL: https://issues.apache.org/jira/browse/FLINK-6319 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Priority: Minor > > A user noted that we simply call {{shutdownNow}} on the > {{SystemProcessingTimeService's}} {{ScheduledThreadpoolExecutor}} when > calling {{SystemProcessingTimeService.shutdownService}}. {{shutdowNow}} will > halt all waiting tasks but it won't wait until the currently running tasks > have been completed. This can lead to unwanted runtime behaviours such as > wrong termination orders when shutting down tasks (as reported in > https://issues.apache.org/jira/browse/FLINK-4973?focusedCommentId=15965884=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15965884). > I propose to add a small timeout to wait for currently running tasks to > complete. Even though this problem cannot be completely solved since timer > tasks might take longer than the specified timeout, a timeout for waiting for > running tasks to complete will mitigate the problem. > We can do this by calling {{timerServicer.awaitTermination(timeout, > timeoutUnit);}} after the {{shutdowNow}} call. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980624#comment-15980624 ] mingleizhang commented on FLINK-5855: - [~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code} prior to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a maybe that the it has alreay removed all elements from the map. Then trigger the {code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem should occur here ? CC [~till.rohrmann] > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5855: --- Assignee: mingleizhang > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980624#comment-15980624 ] mingleizhang edited comment on FLINK-5855 at 4/24/17 1:50 AM: -- [~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code} prior to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a maybe that it has alreay removed all elements from the map. Then trigger the {code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem should occur here ? CC [~till.rohrmann] was (Author: mingleizhang): [~tedyu] Hi, Ted. I would ask if {code}pendingFilesPerCheckpoint {code} prior to the call of {code}handlePendingFilesForPreviousCheckpoints{code}. It is a maybe that the it has alreay removed all elements from the map. Then trigger the {code}handlePendingFilesForPreviousCheckpoints{code}. Is there a problem should occur here ? CC [~till.rohrmann] > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664 ] mingleizhang edited comment on FLINK-5855 at 4/24/17 2:57 AM: -- Yep. That is right. So, follow your suggestion. I would write the code snippet like Correct ?{code}. synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code} was (Author: mingleizhang): @Ted Yu, Yep. That is right. So, follow your suggestion. I would write the code snippet like {code}. Correct ? synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code} > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664 ] mingleizhang commented on FLINK-5855: - @Ted Yu, Yep. That is right. So, follow your suggestion. I would write the code snippet like {code}. Correct ? synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code} > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980664#comment-15980664 ] mingleizhang edited comment on FLINK-5855 at 4/24/17 2:57 AM: -- Yep. That is right. So, follow your suggestion. I would write the code snippet like Correct ?{code} synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code} was (Author: mingleizhang): Yep. That is right. So, follow your suggestion. I would write the code snippet like Correct ?{code}. synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code} > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Issue Comment Deleted] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-5855: Comment: was deleted (was: Yep. That is right. So, follow your suggestion. I would write the code snippet like Correct ?{code} synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); {code}) > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980668#comment-15980668 ] mingleizhang commented on FLINK-5855: - I think the code snippet would follow your suggestion. How do you think ? {code} synchronized (restoredState.pendingFilesPerCheckpoint) { handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); restoredState.pendingFilesPerCheckpoint.clear(); } {code} > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980671#comment-15980671 ] mingleizhang commented on FLINK-5855: - Okay, thanks. I will give a PR to this. I dont know really, why so many issue like concurrent and lock in Flink if there is really a question. > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()
[ https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971966#comment-15971966 ] mingleizhang commented on FLINK-5943: - [~tedyu] Thanks and appreciate it. I will give a PR soon enough. > Unprotected access to haServices in > YarnFlinkApplicationMasterRunner#shutdown() > --- > > Key: FLINK-5943 > URL: https://issues.apache.org/jira/browse/FLINK-5943 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > protected void shutdown(ApplicationStatus status, String msg) { > // Need to clear the job state in the HA services before shutdown > try { > haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); > } > {code} > The access to haServices is without lock protection. > haServices may have been closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 7:14 AM: -- [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does necessary, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} was (Author: mingleizhang): [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does really, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012 ] mingleizhang commented on FLINK-6311: - [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml dont contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207 ] mingleizhang commented on FLINK-6311: - [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it is really, what kinda message should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937 ] mingleizhang edited comment on FLINK-6130 at 4/19/17 2:36 AM: -- [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not get any useful message from it. Thanks and appreciate it. was (Author: mingleizhang): [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not any useful message from it. Thanks and appreciate it. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6311: --- Assignee: mingleizhang > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974124#comment-15974124 ] mingleizhang commented on FLINK-6311: - [~tzulitai] Thanks for telling me so useful information. I am very appreciate it. Yep, I would like to work on this and been working on it soon enough. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937 ] mingleizhang commented on FLINK-6130: - [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not any useful message from it. Thanks and appreciate it. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974385#comment-15974385 ] mingleizhang commented on FLINK-6311: - [~tzulitai] Thanks and I will give a PR to this jira soon enough. Very appreciate it. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 10:02 AM: --- [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml does not contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? was (Author: mingleizhang): [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml dont contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-7302) Failed to run CorrelateITCase class under windows environment
[ https://issues.apache.org/jira/browse/FLINK-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-7302: Description: Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> "#")) Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters( Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " ")) Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters( was: Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> "#")) > Failed to run CorrelateITCase class under windows environment > - > > Key: FLINK-7302 > URL: https://issues.apache.org/jira/browse/FLINK-7302 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Environment: Windows 7 >Reporter: mingleizhang >Assignee: mingleizhang > > Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" > -> "#")) > Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters( > Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" > -> " ")) > Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters( -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7302) Failed to run CorrelateITCase class under windows environment
mingleizhang created FLINK-7302: --- Summary: Failed to run CorrelateITCase class under windows environment Key: FLINK-7302 URL: https://issues.apache.org/jira/browse/FLINK-7302 Project: Flink Issue Type: Bug Components: Table API & SQL Environment: Windows 7 Reporter: mingleizhang Assignee: mingleizhang Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous; it is imported twice in the same scope by import org.apache.flink.table.utils._ and import org.apache.flink.table.runtime.utils._ UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> "#")) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case
[ https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107062#comment-16107062 ] mingleizhang commented on FLINK-7297: - Every time, I dont know why It would take me several hours to see Job log. And I can not see it as it takes me so long time. Always loading > Instable Kafka09ProducerITCase.testCustomPartitioning test case > --- > > Key: FLINK-7297 > URL: https://issues.apache.org/jira/browse/FLINK-7297 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > There seems to be a test instability of > {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on > Travis. > https://travis-ci.org/apache/flink/jobs/258538636 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case
[ https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107062#comment-16107062 ] mingleizhang edited comment on FLINK-7297 at 7/31/17 9:56 AM: -- Every time, I dont know why It would take me several hours to see Job log. And I can not see it as it takes me so long time. Always loadingNever successed to see log before. was (Author: mingleizhang): Every time, I dont know why It would take me several hours to see Job log. And I can not see it as it takes me so long time. Always loading > Instable Kafka09ProducerITCase.testCustomPartitioning test case > --- > > Key: FLINK-7297 > URL: https://issues.apache.org/jira/browse/FLINK-7297 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > There seems to be a test instability of > {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on > Travis. > https://travis-ci.org/apache/flink/jobs/258538636 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case
[ https://issues.apache.org/jira/browse/FLINK-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107059#comment-16107059 ] mingleizhang commented on FLINK-7297: - It is just a XML instead. > Instable Kafka09ProducerITCase.testCustomPartitioning test case > --- > > Key: FLINK-7297 > URL: https://issues.apache.org/jira/browse/FLINK-7297 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > There seems to be a test instability of > {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on > Travis. > https://travis-ci.org/apache/flink/jobs/258538636 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7092) Shutdown ResourceManager components properly
[ https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106216#comment-16106216 ] mingleizhang commented on FLINK-7092: - Thanks to Till. I back to my home these days. :) Shy I am. > Shutdown ResourceManager components properly > > > Key: FLINK-7092 > URL: https://issues.apache.org/jira/browse/FLINK-7092 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, > {{LaunchCoordinator}}, {{ConnectionMonitor}} and a > {{ReconciliationCoordinator}}. These components have to be properly shut down > when the {{MesosResourceManager}} closes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7302) Failed to run CorrelateITCase class under windows environment
[ https://issues.apache.org/jira/browse/FLINK-7302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang resolved FLINK-7302. - Resolution: Won't Fix > Failed to run CorrelateITCase class under windows environment > - > > Key: FLINK-7302 > URL: https://issues.apache.org/jira/browse/FLINK-7302 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Environment: Windows 7 >Reporter: mingleizhang >Assignee: mingleizhang > > Error:(220, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" > -> "#")) > Error:(240, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters( > Error:(107, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" > -> " ")) > Error:(129, 5) reference to UserDefinedFunctionTestUtils is ambiguous; > it is imported twice in the same scope by > import org.apache.flink.table.utils._ > and import org.apache.flink.table.runtime.utils._ > UserDefinedFunctionTestUtils.setJobParameters( -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation
[ https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114213#comment-16114213 ] mingleizhang commented on FLINK-6995: - Ahhha. I see. Thanks ~ > Add a warning to outdated documentation > --- > > Key: FLINK-6995 > URL: https://issues.apache.org/jira/browse/FLINK-6995 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Timo Walther >Assignee: mingleizhang > > When I search for "flink yarn" by Google, the first result is a outdated 0.8 > release documentation page. We should add a warning to outdated documentation > pages. > There are other problems as well: > The main page only links to 1.3 and 1.4 but the flink-docs-master > documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages > only links to older releases so if a user arrives on a 1.2 page they won't > see 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation
[ https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114134#comment-16114134 ] mingleizhang commented on FLINK-6995: - I am planning checkout a new branch based on {{release-1.2 branch}}. Then, working on that branch. If im wrong, please let me know. > Add a warning to outdated documentation > --- > > Key: FLINK-6995 > URL: https://issues.apache.org/jira/browse/FLINK-6995 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Timo Walther >Assignee: mingleizhang > > When I search for "flink yarn" by Google, the first result is a outdated 0.8 > release documentation page. We should add a warning to outdated documentation > pages. > There are other problems as well: > The main page only links to 1.3 and 1.4 but the flink-docs-master > documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages > only links to older releases so if a user arrives on a 1.2 page they won't > see 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation
[ https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114184#comment-16114184 ] mingleizhang commented on FLINK-6995: - Hi, [~alpinegizmo] What you said : We have to be enabled by setting {{site.is_latest}} to false. My question is, where to set it in our code ? I checked {{release-1.2 branch}}. The warning was already existing. And the https://flink.apache.org/q/stable-docs.html had already pointed to the 1.3. I probably think we have finished this work. But not sure. Could you take a look on it ? > Add a warning to outdated documentation > --- > > Key: FLINK-6995 > URL: https://issues.apache.org/jira/browse/FLINK-6995 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Timo Walther >Assignee: mingleizhang > > When I search for "flink yarn" by Google, the first result is a outdated 0.8 > release documentation page. We should add a warning to outdated documentation > pages. > There are other problems as well: > The main page only links to 1.3 and 1.4 but the flink-docs-master > documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages > only links to older releases so if a user arrives on a 1.2 page they won't > see 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7162) Tests should not write outside 'target' directory.
mingleizhang created FLINK-7162: --- Summary: Tests should not write outside 'target' directory. Key: FLINK-7162 URL: https://issues.apache.org/jira/browse/FLINK-7162 Project: Flink Issue Type: Bug Components: Local Runtime Reporter: mingleizhang Assignee: mingleizhang -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7162) Tests should not write outside 'target' directory.
[ https://issues.apache.org/jira/browse/FLINK-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-7162: Issue Type: Improvement (was: Bug) > Tests should not write outside 'target' directory. > -- > > Key: FLINK-7162 > URL: https://issues.apache.org/jira/browse/FLINK-7162 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang > > A few tests use Files.createTempDir() from Guava package, but do not set > java.io.tmpdir system property. Thus the temp directory is created in > unpredictable places and is not being cleaned up by {{mvn clean}}. > This was probably introduced in {{JobManagerStartupTest}} and then replicated > in {{BlobUtilsTest}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7162) Tests should not write outside 'target' directory.
[ https://issues.apache.org/jira/browse/FLINK-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-7162: Description: A few tests use Files.createTempDir() from Guava package, but do not set java.io.tmpdir system property. Thus the temp directory is created in unpredictable places and is not being cleaned up by {{mvn clean}}. This was probably introduced in {{JobManagerStartupTest}} and then replicated in {{BlobUtilsTest}}. > Tests should not write outside 'target' directory. > -- > > Key: FLINK-7162 > URL: https://issues.apache.org/jira/browse/FLINK-7162 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang > > A few tests use Files.createTempDir() from Guava package, but do not set > java.io.tmpdir system property. Thus the temp directory is created in > unpredictable places and is not being cleaned up by {{mvn clean}}. > This was probably introduced in {{JobManagerStartupTest}} and then replicated > in {{BlobUtilsTest}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6934) Consider moving LRUCache and it's test classes
[ https://issues.apache.org/jira/browse/FLINK-6934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6934: Summary: Consider moving LRUCache and it's test classes (was: Consider moving LRUCache class) > Consider moving LRUCache and it's test classes > -- > > Key: FLINK-6934 > URL: https://issues.apache.org/jira/browse/FLINK-6934 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.4.0 > > > LRUCache class is not used any more. So, I would suggest remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6378) Implement FLIP-6 Flink-on-Mesos
[ https://issues.apache.org/jira/browse/FLINK-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081702#comment-16081702 ] mingleizhang commented on FLINK-6378: - Why are we going to remove akka dependency ? > Implement FLIP-6 Flink-on-Mesos > --- > > Key: FLINK-6378 > URL: https://issues.apache.org/jira/browse/FLINK-6378 > Project: Flink > Issue Type: New Feature > Components: Mesos >Reporter: Eron Wright > Labels: flip-6 > > This is the parent issue for implementing Flink on Mesos using the new FLIP-6 > architecture. > This covers individual jobs running as Mesos frameworks, where the framework > and job lifetime are coupled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7139) Refine currentTimeMillis to nanoTime
[ https://issues.apache.org/jira/browse/FLINK-7139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079084#comment-16079084 ] mingleizhang commented on FLINK-7139: - Hello, [~Zentol] What do you think ? > Refine currentTimeMillis to nanoTime > > > Key: FLINK-7139 > URL: https://issues.apache.org/jira/browse/FLINK-7139 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Minor > > When measuring intervals of time it's much better & safer to use > {{System.nanoTime()}} than {{System.currentTimeMillis()}}. The latter is > affected by system clock drift and adjustments (like by system time daemon) > and the former is much more precise and stable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7139) Refine currentTimeMillis to nanoTime
mingleizhang created FLINK-7139: --- Summary: Refine currentTimeMillis to nanoTime Key: FLINK-7139 URL: https://issues.apache.org/jira/browse/FLINK-7139 Project: Flink Issue Type: Improvement Components: Local Runtime Reporter: mingleizhang Assignee: mingleizhang Priority: Minor When measuring intervals of time it's much better & safer to use {{System.nanoTime()}} than {{System.currentTimeMillis()}}. The latter is affected by system clock drift and adjustments (like by system time daemon) and the former is much more precise and stable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils
mingleizhang created FLINK-7134: --- Summary: Remove hadoop1.x code in mapreduce.utils.HadoopUtils Key: FLINK-7134 URL: https://issues.apache.org/jira/browse/FLINK-7134 Project: Flink Issue Type: Improvement Components: Java API Reporter: mingleizhang Assignee: mingleizhang This jira is similar to FLINK-7118. And for a clearer format and a review, I separated the two jira. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7121) Remove the hardcoded way in core
[ https://issues.apache.org/jira/browse/FLINK-7121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078189#comment-16078189 ] mingleizhang commented on FLINK-7121: - OK. > Remove the hardcoded way in core > > > Key: FLINK-7121 > URL: https://issues.apache.org/jira/browse/FLINK-7121 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: mingleizhang >Assignee: mingleizhang > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085030#comment-16085030 ] mingleizhang commented on FLINK-6493: - Thanks Ted. {{partitionStateSerializerConfigSnapshot.equals()}} also may lead to NPE I guess. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Priority: Minor > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085054#comment-16085054 ] mingleizhang edited comment on FLINK-6493 at 7/13/17 2:29 AM: -- Actually, Sorry. I dont think null check is ineffective, as it just combination with another clause which belongs to not empty constructor has {{Preconditions.checkNotNull}}. In empty constructor, it does not have a null check. Even though,it would be still throw an NPE when call {{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. So, I believe in this case, we should return a false instead of throw an NPE. was (Author: mingleizhang): Actually, Sorry. I dont think null check is ineffective, as it just combination with another clause which belongs to not empty constructor has {{Preconditions.checkNotNull}}. In empty constructor, it does not have a null check. Even though,it would be still throw an NPE when call {{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. So, I believe in this case, we should return a false instead throw an NPE. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6493: --- Assignee: mingleizhang > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085159#comment-16085159 ] mingleizhang commented on FLINK-5541: - Thanks Ted report this, I wil take a look. > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085054#comment-16085054 ] mingleizhang commented on FLINK-6493: - Actually, Sorry. I dont think null check is ineffective, as it just combination with another clause which belongs to not empty constructor has {{Preconditions.checkNotNull}}. In empty constructor, it does not have a null check. Even though,it would be still throw an NPE when call {{partitionStateSerializer.equals()}} if {{partitionStateSerializer}} is null and {{((Snapshot) obj).getPartitionStateSerializer() == null}} return a false. So, I believe in this case, we should return a false instead throw an NPE. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6105: --- Assignee: mingleizhang > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: mingleizhang > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5541: --- Assignee: mingleizhang > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5987) Upgrade zookeeper dependency to 3.4.10
[ https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5987: --- Assignee: mingleizhang > Upgrade zookeeper dependency to 3.4.10 > -- > > Key: FLINK-5987 > URL: https://issues.apache.org/jira/browse/FLINK-5987 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: mingleizhang > > zookeeper 3.4.8 has been released. > Among the fixes the following are desirable: > ZOOKEEPER-706 large numbers of watches can cause session re-establishment to > fail > ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll > This issue upgrades zookeeper dependency to 3.4.8 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-5486: --- Assignee: mingleizhang > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7118) Remove hadoop1.x code in HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-7118: Description: Since flink no longer support hadoop 1.x version, we should remove it. Below code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}} {code:java} public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { try { Class clazz; // for Hadoop 1.xx if(JobContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); } Constructor constructor = clazz.getConstructor(Configuration.class, JobID.class); JobContext context = (JobContext) constructor.newInstance(configuration, jobId); return context; } catch(Exception e) { throw new Exception("Could not create instance of JobContext."); } } {code} And {code:java} public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { try { Class clazz; // for Hadoop 1.xx if(JobContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); } Constructor constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); return context; } catch(Exception e) { throw new Exception("Could not create instance of TaskAttemptContext."); } } {code} was: Since flink no longer support hadoop 1.x version, we should remove it. {code:java} public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { try { Class clazz; // for Hadoop 1.xx if(JobContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); } Constructor constructor = clazz.getConstructor(Configuration.class, JobID.class); JobContext context = (JobContext) constructor.newInstance(configuration, jobId); return context; } catch(Exception e) { throw new Exception("Could not create instance of JobContext."); } } {code} And {code:java} public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { try { Class clazz; // for Hadoop 1.xx if(JobContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); } Constructor constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); return context; } catch(Exception e) {
[jira] [Commented] (FLINK-7114) Remove the hardcoded way like classpath is written by a string
[ https://issues.apache.org/jira/browse/FLINK-7114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078199#comment-16078199 ] mingleizhang commented on FLINK-7114: - Thanks. Chesnay. Makes sense better. > Remove the hardcoded way like classpath is written by a string > -- > > Key: FLINK-7114 > URL: https://issues.apache.org/jira/browse/FLINK-7114 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang > > For example, Like in {{HadoopFileSystem}} class, there is a field which is > written like {{private static final String DEFAULT_HDFS_CLASS = > "org.apache.hadoop.hdfs.DistributedFileSystem";}}. This way of writing has a > bad stuff, once the classpath changes, it is difficult to find the error. I > suggest we should change it to a more flexable way. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092465#comment-16092465 ] mingleizhang commented on FLINK-7134: - I will work on this soon enough. > Remove hadoop1.x code in mapreduce.utils.HadoopUtils > > > Key: FLINK-7134 > URL: https://issues.apache.org/jira/browse/FLINK-7134 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: mingleizhang >Assignee: mingleizhang > > This jira is similar to FLINK-7118. And for a clearer format and a review, I > separated the two jira. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7225) Cutoff exception message in StateDescriptor
[ https://issues.apache.org/jira/browse/FLINK-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094600#comment-16094600 ] mingleizhang commented on FLINK-7225: - [~StephanEwen] Sorry, I dont understand what you exactly mean. Do you agree what I mentioned above {{please make the type to be parameterized by using generics}} is correct ? But you mean I need construct another {{StateDescriptor}} construction method is like following with adding an implemention by {{TypeHint}} as a parameter ? {code:java} protected StateDescriptor(String name, TypeHint type, T defaultValue) { this.name = requireNonNull(name, "name must not be null"); requireNonNull(type, "type class must not be null"); try { this.typeInfo = type.getTypeInfo(); {code} > Cutoff exception message in StateDescriptor > --- > > Key: FLINK-7225 > URL: https://issues.apache.org/jira/browse/FLINK-7225 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler > Fix For: 1.4.0, 1.3.2 > > > When the type extraction fails in the StateDescriptor constructor an > exception is thrown, but the message is cutoff and doesn't contain any advice > to remedy the situation. > {code} > try { > this.typeInfo = TypeExtractor.createTypeInfo(type); > } catch (Exception e) { > throw new RuntimeException("Cannot create full type > information based on the given class. If the type has generics, please", e); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4421) Make clocks and time measurements monotonous
[ https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095699#comment-16095699 ] mingleizhang commented on FLINK-4421: - Hi, Stephan. Thanks for reporting this. I would like to know, Under what circumstances will lead to clock updates ? that is what you said can lead to negative duration. And I would like to try read {{Introduce a Clock utility for monotonous system timestamps}}. Like today I run a code like {{System.currentTimeMillis()}} return 12345. Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to negative duration. > Make clocks and time measurements monotonous > > > Key: FLINK-4421 > URL: https://issues.apache.org/jira/browse/FLINK-4421 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Priority: Minor > > Currently, many places use {{System.currentTimeMillis()}} to acquire > timestamps or measure time intervals. > Since this relies on the system clock, and the system clock is not > necessarily monotonous (in the presence of clock updates), this can lead to > negative duration and decreasing timestamps where increasing timestamps are > expected. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-4421) Make clocks and time measurements monotonous
[ https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095699#comment-16095699 ] mingleizhang edited comment on FLINK-4421 at 7/21/17 2:46 AM: -- Hi, [~StephanEwen]. Thanks for reporting this. I would like to know, Under what circumstances will lead to clock updates ? that is what you said can lead to negative duration. And I would like to try read {{Introduce a Clock utility for monotonous system timestamps}}. Like today I run a code like {{System.currentTimeMillis()}} return 12345. Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to negative duration. was (Author: mingleizhang): Hi, Stephan. Thanks for reporting this. I would like to know, Under what circumstances will lead to clock updates ? that is what you said can lead to negative duration. And I would like to try read {{Introduce a Clock utility for monotonous system timestamps}}. Like today I run a code like {{System.currentTimeMillis()}} return 12345. Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to negative duration. > Make clocks and time measurements monotonous > > > Key: FLINK-4421 > URL: https://issues.apache.org/jira/browse/FLINK-4421 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Priority: Minor > > Currently, many places use {{System.currentTimeMillis()}} to acquire > timestamps or measure time intervals. > Since this relies on the system clock, and the system clock is not > necessarily monotonous (in the presence of clock updates), this can lead to > negative duration and decreasing timestamps where increasing timestamps are > expected. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (FLINK-4421) Make clocks and time measurements monotonous
[ https://issues.apache.org/jira/browse/FLINK-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-4421: Comment: was deleted (was: Hi, [~StephanEwen]. Thanks for reporting this. I would like to know, Under what circumstances will lead to clock updates ? that is what you said can lead to negative duration. And I would like to try read {{Introduce a Clock utility for monotonous system timestamps}}. Like today I run a code like {{System.currentTimeMillis()}} return 12345. Tomorrow, I run {{System.currentTimeMillis()}} return 12145. So, It can lead to negative duration.) > Make clocks and time measurements monotonous > > > Key: FLINK-4421 > URL: https://issues.apache.org/jira/browse/FLINK-4421 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Priority: Minor > > Currently, many places use {{System.currentTimeMillis()}} to acquire > timestamps or measure time intervals. > Since this relies on the system clock, and the system clock is not > necessarily monotonous (in the presence of clock updates), this can lead to > negative duration and decreasing timestamps where increasing timestamps are > expected. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4423) Introduce a Clock utility for monotonous system timestamps
[ https://issues.apache.org/jira/browse/FLINK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095743#comment-16095743 ] mingleizhang commented on FLINK-4423: - [~StephanEwen] This is a very interesting topic indeed . One question I would like to ask and confirm, We add Clock utility is that because we want to repair or fixing the existence of the {{System.currentTimeMillis()}} which is not monotonous issue ? With remembers the max returned timestamp and that will let the new function has the same functionality as {{System.nanoTime()}}. I understand correct ? > Introduce a Clock utility for monotonous system timestamps > -- > > Key: FLINK-4423 > URL: https://issues.apache.org/jira/browse/FLINK-4423 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen > > I suggest to introduce a {{Clock}} class that provides a > {{currentTimeMillis()}} function that calls {{System.currentTimeMillis()}} > but also remembers the max returned timestamp so far. That way it would never > return decreasing timestamps. > In the presence of clock backwards adjustments, the appearance would be that > time stands still for a while, until the clock has caught up with the > previous timestamp. > Since we don't rely on this for measuring timeouts, but only for logging / > visualization / etc (see [FLINK-4422]) it should not mess up any distributed > system behavior. > We would use this in places like the {{ExecutionGraph}}, where we record > timestamps for state transitions. That way, the utilities that derive charts > and times from the status timestamps would not be thrown off if timestamps > were decreasing when expected increasing. > The same holds for ingestion time timestamps and for processing time triggers. > NOTE: I would like some other opinions on that - it is a somewhat delicate > matter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5130) Remove Deprecated Methods from WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-5130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096110#comment-16096110 ] mingleizhang commented on FLINK-5130: - If agree, I wont do it until 2.0.0 begin. > Remove Deprecated Methods from WindowedStream > - > > Key: FLINK-5130 > URL: https://issues.apache.org/jira/browse/FLINK-5130 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Aljoscha Krettek > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3992) Remove Key interface
[ https://issues.apache.org/jira/browse/FLINK-3992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096123#comment-16096123 ] mingleizhang commented on FLINK-3992: - Could you please add more message here ? [~Zentol] Thanks. :) > Remove Key interface > > > Key: FLINK-3992 > URL: https://issues.apache.org/jira/browse/FLINK-3992 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Affects Versions: 1.0.0 >Reporter: Chesnay Schepler > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096120#comment-16096120 ] mingleizhang commented on FLINK-4048: - I would like work on it when 2.0.0 should comes. Now, I wont. > Remove Hadoop dependencies from ExecutionEnvironment > > > Key: FLINK-4048 > URL: https://issues.apache.org/jira/browse/FLINK-4048 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Reporter: Robert Metzger > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-5789: Summary: Make Bucketing Sink independent of Hadoop's FileSystem (was: Make Bucketing Sink independent of Hadoop's FileSysten) > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0, 1.1.4 >Reporter: Stephan Ewen > Fix For: 1.4.0 > > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)