Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread yinhua.dai
In our case, we wrote a console table sink which print everything on the
console, and use "insert into" to write the interim result to console.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The way to write a UDF with generic type

2019-01-07 Thread yinhua.dai
Hi Timo,

Can you let me know how the build-in "MAX" function able to support
different types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

2019-01-07 Thread Wei Sun
Hi,Timo


Good day!


Thank you for your help! This issue has been solved with the rebuilt flink 
version.  But I found that does not work with the 
'Apache Flink 1.7.1 only' version even if i configure the class path like 
export HADOOP_CLASSPATH=`hadoop classpath` . I will check it later. 
Thanks again.


Best Regards
Wei


-- Original --
From:  "Timo Walther";;
Date:  Jan 8, 2019
To:  "user"; 
Cc:  "gary"; 
Subject:  Re: Building Flink from source according to vendor-specific 
versionbut causes protobuf conflict



   Hi Wei,
 
 
 did you play around with classloading   options mentioned here [1]. 
The -d option might impact how classes   are loaded when the job is 
deployed on the cluster.
 
 
 I will loop in Gary that might now more   about the YARN behavior.
 
 
 Regards,
 Timo
 
 
 
 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath
 
 
 
 
 
 Am 07.01.19 um 10:33 schrieb Wei Sun:
 
   Hi guys,
   
 
   Good day.
   
 
   I rebuilt flink from the source   and specified the vendor 
specific Hadoop version. It works well when i just submit a 
streaming application  without '-d'(--detached) option as follows:
   bin/flink run -m yarn-cluster   -yqu root.streaming -yn 5 -yjm 
2048 -ytm 3096 -ynm   CJVFormatter -ys 2 -c yidian.data.cjv.Formatter   
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
   
   
   But if i add the '-d'(--detached)   option,  a 
'org.apache.flink.client.deployment.ClusterDeploymentException'   will 
be thrown out to the CLI. Just as:
   bin/flink   run -d -m yarn-cluster -yqu root.streaming -yn   
5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c   
yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar   --conf 
./formatter.conf
   
 
Exception   
start
  The program finished with the following exception:
 org.apache.flink.client.deployment.ClusterDeploymentException: 
Could not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
 Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
 The YARN application unexpectedly switched to state FAILED 
during deployment. 
 Diagnostics from YARN: Application 
application_1544777537685_0068 failed 2 times due to AM Container 
for appattempt_1544777537685_0068_02 exited with  exitCode: 1
 For more detailed output, check application tracking 
page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then,
 click on links to logs of each attempt.
 Diagnostics: Exception from container-launch.
 Container id: container_e03_1544777537685_0068_02_01
 Exit code: 1
 Stack trace: ExitCodeException exitCode=1: 
at 
org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(Con

Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Puneet Kinra
Hi hequan

Weird behaviour when i m calling ctx.timeservice() function is getting
exited even not throwing error

On Tuesday, January 8, 2019, Hequn Cheng  wrote:

> Hi puneet,
>
> Could you print `parseLong + 5000` and 
> `ctx.timerService().currentProcessingTime()`
> out and check the value?
> I know it is a streaming program. What I mean is the timer you have
> registered is not within the interval of your job, so the timer has not
> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
> 1000(very big).
>
> Best, Hequn
>
>
> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra  com> wrote:
>
>> I checked the same the function is getting exited when i am calling
>> ctx.getTimeservice () function.
>>
>> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:
>>
>>> Hi Puneet,
>>>
>>> maybe you can show or explain us a bit more about your pipeline. From
>>> what I see your ProcessFunction looks correct. Are you sure the registering
>>> takes place?
>>>
>>> Regards,
>>> Timo
>>>
>>> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>>>
>>> Hi Hequn
>>>
>>> Its a streaming job .
>>>
>>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng  wrote:
>>>
 Hi Puneet,

 The value of the registered timer should within startTime and endTime
 of your job. For example, job starts at processing time t1 and stops at
 processing time t2. You have to make sure t1< `parseLong + 5000` < t2.

 Best, Hequn

 On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
 puneet.ki...@customercentria.com> wrote:

> Hi All
>
> Facing some issue with context to onTimer method in processfunction
>
> class TimerTest extends ProcessFunction,String>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> @Override
> public void processElement(Tuple2 arg0,
> ProcessFunction, String>.Context ctx,
> Collector arg2) throws Exception {
> // TODO Auto-generated method stub
> long parseLong = Long.parseLong(arg0.f1);
> TimerService timerService = ctx.timerService();
> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
> }
>
> @Override
> public void onTimer(long timestamp, ProcessFunction String>, String>.OnTimerContext ctx,
> Collector out) throws Exception {
> // TODO Auto-generated method stub
> super.onTimer(timestamp, ctx, out);
> System.out.println("Executing timmer"+timestamp);
> out.collect("Timer Testing..");
> }
> }
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Hequn Cheng
Hi puneet,

Could you print `parseLong + 5000` and
`ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have
registered is not within the interval of your job, so the timer has not
been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
1000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> I checked the same the function is getting exited when i am calling
> ctx.getTimeservice () function.
>
> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:
>
>> Hi Puneet,
>>
>> maybe you can show or explain us a bit more about your pipeline. From
>> what I see your ProcessFunction looks correct. Are you sure the registering
>> takes place?
>>
>> Regards,
>> Timo
>>
>> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>>
>> Hi Hequn
>>
>> Its a streaming job .
>>
>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng  wrote:
>>
>>> Hi Puneet,
>>>
>>> The value of the registered timer should within startTime and endTime of
>>> your job. For example, job starts at processing time t1 and stops at
>>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
>>> puneet.ki...@customercentria.com> wrote:
>>>
 Hi All

 Facing some issue with context to onTimer method in processfunction

 class TimerTest extends ProcessFunction,String>{

 /**
 *
 */
 private static final long serialVersionUID = 1L;

 @Override
 public void processElement(Tuple2 arg0,
 ProcessFunction, String>.Context ctx,
 Collector arg2) throws Exception {
 // TODO Auto-generated method stub
 long parseLong = Long.parseLong(arg0.f1);
 TimerService timerService = ctx.timerService();
 ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
 }

 @Override
 public void onTimer(long timestamp, ProcessFunction>>> String>, String>.OnTimerContext ctx,
 Collector out) throws Exception {
 // TODO Auto-generated method stub
 super.onTimer(timestamp, ctx, out);
 System.out.println("Executing timmer"+timestamp);
 out.collect("Timer Testing..");
 }
 }

 --
 *Cheers *

 *Puneet Kinra*

 *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
 *

 *e-mail :puneet.ki...@customercentria.com
 *



>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: Kafka 2.0 quotas

2019-01-07 Thread Vishal Santoshi
Aah, quotas are broker side "throttles" so I guess this is any client API
specific including "flink" ?

On Mon, Jan 7, 2019 at 2:17 PM Vishal Santoshi 
wrote:

>
> https://www.cloudera.com/documentation/kafka/2-2-x/topics/kafka_performance.html#quotas
> Any thoughts around feasibility ( in fact is this even supported )  of
> using quotas in flink based kafka connectors to throttle consumption and
> possibly mitigating back pressure if the optimal consumption rate is a
> prior known ?
>


Kafka 2.0 quotas

2019-01-07 Thread Vishal Santoshi
https://www.cloudera.com/documentation/kafka/2-2-x/topics/kafka_performance.html#quotas
Any thoughts around feasibility ( in fact is this even supported )  of
using quotas in flink based kafka connectors to throttle consumption and
possibly mitigating back pressure if the optimal consumption rate is a
prior known ?


Re: S3 StreamingFileSink never completes multipart uploads

2019-01-07 Thread Kostas Kloudas
No problem and thanks for looking into the problem!

I also commented on the JIRA.

Cheers,
Kostas

On Mon, Jan 7, 2019, 18:13 Addison Higham  The not seeing the final post from the logs was what initially clued me
> into the same issue, so I figured it was the same issue, but I should have
> worded it better that it is one possible explanation.
>
> Sorry for the confusion!
>
> Addison
>
>
>
>
>
> On Fri, Jan 4, 2019 at 11:24 PM Kostas Kloudas  wrote:
>
>> Hi Addison,
>>
>> From the information that Nick provides, how can you be sure that the
>> root cause is the same?
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Jan 4, 2019, 22:10 Addison Higham >
>>> Hi Nick,
>>>
>>> This is a known issue with 1.7.0, I have an issue opened up here:
>>> https://issues.apache.org/jira/browse/FLINK-11187
>>>
>>>
>>>
>>> On Wed, Jan 2, 2019 at 5:00 PM Martin, Nick  wrote:
>>>
 I’m running on Flink 1.7.0 trying to use the StreamingFileSink with an
 S3A URI. What I’m seeing is that whenever the RollingPolicy determines that
 it’s time to roll to a new part file, the whole Sink just hangs, and the in
 progress MultiPart Upload never gets completed. I’ve looked at the traffic
 between Flink and the S3 endpoint, and I don’t ever see the POST message
 that should close off a completed upload. Has anyone else run into
 something like that?







 Nick Martin



 --
 Notice: This e-mail is intended solely for use of the individual or
 entity to which it is addressed and may contain information that is
 proprietary, privileged and/or exempt from disclosure under applicable law.
 If the reader is not the intended recipient or agent responsible for
 delivering the message to the intended recipient, you are hereby notified
 that any dissemination, distribution or copying of this communication is
 strictly prohibited. This communication may also contain data subject to
 U.S. export laws. If so, data subject to the International Traffic in Arms
 Regulation cannot be disseminated, distributed, transferred, or copied,
 whether incorporated or in its original form, to foreign nationals residing
 in the U.S. or abroad, absent the express prior approval of the U.S.
 Department of State. Data subject to the Export Administration Act may not
 be disseminated, distributed, transferred or copied contrary to U. S.
 Department of Commerce regulations. If you have received this communication
 in error, please notify the sender by reply e-mail and destroy the e-mail
 message and any physical copies made of the communication.
  Thank you.
 *

>>>


Re: How to migrate Kafka Producer ?

2019-01-07 Thread Edward Rojas
Hi Piotr,

Thank you for looking into this. 
Do you have an idea when next version (1.7.2) will be available ?

Also, could you validate / invalidate the approach I proposed in the
previous comment ?


Edward Rojas wrote
> Regarding the kafka producer I am just updating the job with the new
> connector and removing the previous one and upgrading the job by using a
> savepoint and the --allowNonRestoredState. 
> So far my tests with this option are successful.

Is there any risk of using this approach and just ignore the state of the
previous Producer ?


Thanks again for your help.

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Puneet Kinra
I checked the same the function is getting exited when i am calling
ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:

> Hi Puneet,
>
> maybe you can show or explain us a bit more about your pipeline. From what
> I see your ProcessFunction looks correct. Are you sure the registering
> takes place?
>
> Regards,
> Timo
>
> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>
> Hi Hequn
>
> Its a streaming job .
>
> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng  wrote:
>
>> Hi Puneet,
>>
>> The value of the registered timer should within startTime and endTime of
>> your job. For example, job starts at processing time t1 and stops at
>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>>
>> Best, Hequn
>>
>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> Hi All
>>>
>>> Facing some issue with context to onTimer method in processfunction
>>>
>>> class TimerTest extends ProcessFunction,String>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>>
>>> @Override
>>> public void processElement(Tuple2 arg0,
>>> ProcessFunction, String>.Context ctx,
>>> Collector arg2) throws Exception {
>>> // TODO Auto-generated method stub
>>> long parseLong = Long.parseLong(arg0.f1);
>>> TimerService timerService = ctx.timerService();
>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>>> }
>>>
>>> @Override
>>> public void onTimer(long timestamp, ProcessFunction>> String>, String>.OnTimerContext ctx,
>>> Collector out) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.onTimer(timestamp, ctx, out);
>>> System.out.println("Executing timmer"+timestamp);
>>> out.collect("Timer Testing..");
>>> }
>>> }
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Building Flink from source according to vendor-specific version but causes protobuf conflict

2019-01-07 Thread Timo Walther

Hi Wei,

did you play around with classloading options mentioned here [1]. The -d 
option might impact how classes are loaded when the job is deployed on 
the cluster.


I will loop in Gary that might now more about the YARN behavior.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath



Am 07.01.19 um 10:33 schrieb Wei Sun:

Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific 
Hadoop version. It works well when i just submit a streaming 
application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 
3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


But if i add the '-d'(--detached) option,  a 
'*org.apache.flink.client.deployment.ClusterDeploymentException*' will 
be thrown out to the CLI. Just as:
bin/flink run */-d/* -m yarn-cluster -yqu root.streaming -yn 5 -yjm 
2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


*Exception 
start*

 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during 
deployment.
Diagnostics from YARN: Application application_1544777537685_0068 
failed 2 times due to AM Container for 
appattempt_1544777537685_0068_02 exited with  exitCode: 1
For more detailed output, check application tracking 
page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, 
click on links to logs of each attempt.

Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to 
further investigate the issue:

yarn logs -applicationId application_1544777537685_0068
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)

... 9 more
2019-01-07 17:08:55,463 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
application
2019-01-07 17:08:55,471 INFO 
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deleting 
files in

*---End--EndEnd-*

My cluster has enable the log aggregation, so I executed the following 
command:
yarn logs -applicationId applic

Re: S3 StreamingFileSink never completes multipart uploads

2019-01-07 Thread Addison Higham
The not seeing the final post from the logs was what initially clued me
into the same issue, so I figured it was the same issue, but I should have
worded it better that it is one possible explanation.

Sorry for the confusion!

Addison





On Fri, Jan 4, 2019 at 11:24 PM Kostas Kloudas  wrote:

> Hi Addison,
>
> From the information that Nick provides, how can you be sure that the root
> cause is the same?
>
> Cheers,
> Kostas
>
> On Fri, Jan 4, 2019, 22:10 Addison Higham 
>> Hi Nick,
>>
>> This is a known issue with 1.7.0, I have an issue opened up here:
>> https://issues.apache.org/jira/browse/FLINK-11187
>>
>>
>>
>> On Wed, Jan 2, 2019 at 5:00 PM Martin, Nick  wrote:
>>
>>> I’m running on Flink 1.7.0 trying to use the StreamingFileSink with an
>>> S3A URI. What I’m seeing is that whenever the RollingPolicy determines that
>>> it’s time to roll to a new part file, the whole Sink just hangs, and the in
>>> progress MultiPart Upload never gets completed. I’ve looked at the traffic
>>> between Flink and the S3 endpoint, and I don’t ever see the POST message
>>> that should close off a completed upload. Has anyone else run into
>>> something like that?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Nick Martin
>>>
>>>
>>>
>>> --
>>> Notice: This e-mail is intended solely for use of the individual or
>>> entity to which it is addressed and may contain information that is
>>> proprietary, privileged and/or exempt from disclosure under applicable law.
>>> If the reader is not the intended recipient or agent responsible for
>>> delivering the message to the intended recipient, you are hereby notified
>>> that any dissemination, distribution or copying of this communication is
>>> strictly prohibited. This communication may also contain data subject to
>>> U.S. export laws. If so, data subject to the International Traffic in Arms
>>> Regulation cannot be disseminated, distributed, transferred, or copied,
>>> whether incorporated or in its original form, to foreign nationals residing
>>> in the U.S. or abroad, absent the express prior approval of the U.S.
>>> Department of State. Data subject to the Export Administration Act may not
>>> be disseminated, distributed, transferred or copied contrary to U. S.
>>> Department of Commerce regulations. If you have received this communication
>>> in error, please notify the sender by reply e-mail and destroy the e-mail
>>> message and any physical copies made of the communication.
>>>  Thank you.
>>> *
>>>
>>


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Timo Walther

Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From 
what I see your ProcessFunction looks correct. Are you sure the 
registering takes place?


Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:

Hi Hequn

Its a streaming job .

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng > wrote:


Hi Puneet,

The value of the registered timer should within startTime and
endTime of your job. For example, job starts at processing time t1
and stops at processing time t2. You have to make sure t1<
`parseLong + 5000` < t2.

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra
mailto:puneet.ki...@customercentria.com>> wrote:

Hi All

Facing some issue with context to onTimer method in
processfunction

class TimerTest extends
ProcessFunction,String>{

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2 arg0,
ProcessFunction, String>.Context ctx,
Collector arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp,
ProcessFunction, String>.OnTimerContext
ctx,
Collector out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

-- 
*Cheers *

*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype :
puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*




--
*Cheers *
*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com 
*


*e-mail :puneet.ki...@customercentria.com 
*







Re: Buffer stats when Back Pressure is high

2019-01-07 Thread Timo Walther

Hi Gagan,

a typical solution to such a problem is to introduce an artifical key 
(enrichment id + some additional suffix), you can then keyBy on this 
artificial key and thus spread the workload more evenly. Of course you 
need to make sure that records of the second stream are duplicated to 
all operators with the same artificial key.


Depending on the frequency of the second stream, it might also worth to 
use a broadcast join that distributes the second stream to all operators 
such that all operators can perform the enrichment step in a round robin 
fashion.


Regards,
Timo

Am 07.01.19 um 14:45 schrieb Gagan Agrawal:

Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for 
few. However I just checked for all and found couple of them having 
queue length of 40+ which seems to be due to skewness in data. Is 
there any general guide lines on how to handle skewed data? In my case 
I am taking union and then keyBy (with custom stateful Process 
function) on enrichment id of 2 streams (1 enrichment stream with low 
volume and another regular data stream with high volume). I see that 
30% of my data stream records have same enrichment Id and hence go to 
same tasks which results in skewness. Any pointers on how to handle 
skewness while doing keyBy would be of great help.


Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang > wrote:


Hi Gagan,

What flink version do you use? And have you checked the
buffers.inputQueueLength for all the related parallelism
(connected with A) of B?  It may exist the scenario that only one
parallelim B is full of inqueue buffers which back pressure A, and
the input queue for other parallelism B is empty.

Best,
Zhijiang

--
From:Gagan Agrawal mailto:agrawalga...@gmail.com>>
Send Time:2019年1月7日(星期一) 12:06
To:user mailto:user@flink.apache.org>>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in
debugging / validating that downstream operator is performing
slow when Back Pressure is high? Say I have A -> B operators
and A shows High Back Pressure which indicates something wrong
or not performing well on B side which is slowing down
operator A. However when I look at buffers.inputQueueLength
for operator B, it's 0. My understanding is that when B is
processing slow, it's input buffer will be full of incoming
messages which ultimately blocks/slows down upstream operator
A. However it doesn't seem to be happening in my case. Can
someone throw some light on how should different stats around
buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look
like when downstream operator is performing slow?

Gagan






Re: Reducing runtime of Flink planner

2019-01-07 Thread Timo Walther

Hi Niklas,

it would be interesting to know which planner caused the long runtime. 
Could you use a debugger to figure out more details? Is it really the 
Flink Table API planner or the under DataSet planner one level deeper?


There was an issue that was recently closed [1] about the DataSet 
optimizer. Could this solve your problem?


I will also loop in Fabian who might knows more.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10566

Am 07.01.19 um 14:05 schrieb Niklas Teichmann:

Hi everybody,

I have a question concerning the planner for the Flink Table / Batch API.
At the moment I try to use a library called Cypher for Apache Flink, a 
project that tries to implement
the graph database query language Cypher on Apache Flink (CAPF, 
https://github.com/soerenreichardt/cypher-for-apache-flink).


The problem is that the planner seemingly takes a very long time to 
plan and optimize the job created by CAPF. This example job in json 
format


https://pastebin.com/J84grsjc

takes on a 24 GB data set about 20 minutes to plan and about 5 minutes 
to run the job. That seems very long for a job of this size.


Do you have any idea why this is the case?
Is there a way to give the planner hints to reduce the planning time?

Thanks in advance!
Niklas





Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread Timo Walther

Hi Henry,

such a feature is currently under discussion [1] feel free to 
participate here and give feedback. So far you need to have some 
intermediate store usually this could be Kafka or a filesystem.


I would recommend to write little unit tests that test each SQL step 
like it is done here [2].


I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Interactive-Programming-in-Flink-Table-API-tt25372.html#a25666
[2] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala



Am 07.01.19 um 16:19 schrieb 徐涛:

Hi Expert,
Usually when we write Flink-SQL program, usually we need to use 
multiple tables to get the final result, this is due to sometimes it is not 
possible to implement complicated logic in one SQL, sometimes due to the 
clarity of logic. For example:
create view A as
select * from source where xxx;

create view B as
select * from A where xxx;

create view C as
select * from B where xxx;

insert into sink
select * from C where xxx;

But when we write complicated logic, we may accomplish it step by step, 
make sure that the first step is correct, then go on with the next step. In 
batch program such as Hive or Spark, we usually write SQL like this, step by 
step.

For example:
create view A as
select * from source where xxx;
I want to check if the content in A is correct, if it is correct I go 
on to write another SQL. But I do not want to define a sink for each step, 
because it is not worthy just create a sink for such a “debug” step.
So is there a solution or best practice for such a scenario? How do we 
easily debug or verify the correctness  of a Flink SQL program?

Best
Henry





Re: The way to write a UDF with generic type

2019-01-07 Thread Timo Walther
Currently, there is no more flexible approch for aggregate functions. 
Scalar functions can be overloaded but aggregate functions do not 
support this so far.


Regards,
Timo


Am 07.01.19 um 02:27 schrieb yinhua.dai:

Hi Timo,

But getResultType should only return a concrete type information, right?
How could I implement with a generic type?

I'd like to clarify my questions again.
Say I want to implement my own "MAX" function, but I want to apply it to
different types, e.g. integer, long, double etc, so I tried to write a class
which extends AggregateFunction *with generic type* to implement the max
function.

Then I want to register only one function name for all types.
E.g.
tableEnv.registerFunction("MYMAX", new MyMax());
instead of
tableEnv.registerFunction("MYINTEGERMAX", new MyIntegerMax());
tableEnv.registerFunction("MYLONGMAX", new MyLongMax());
tableEnv.registerFunction("MYDOULBEMAX", new MyDoubleMax());

Is there a way to implement that?
I know the build in function "MAX" can apply to all types, so I wonder if I
can also implement that.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Passing vm options

2019-01-07 Thread Dominik Wosiński
Hey,
AFAIK, Flink supports dynamic properties currently only on YARN and not
really in standalone mode.
If You are using YARN it should indeed be possible to set such
configuration. If not, then I am afraid it is not possible.

Best Regards,
Dom.


pon., 7 sty 2019 o 09:01 Avi Levi  napisał(a):

> Hi ,
> I am trying to pass some vm options e.g
> bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar
> -Dflink.stateDir=file:///tmp/ -Dkafka.bootstrap.servers="localhost:9092"
> -Dkafka.security.ssl.enabled=false
> but it doesn't seem to override the values in application.conf . Am I
> missing something?
> BTW is it possible to pass config file using -Dcofig.file ?
>
> BR
> Avi
>


How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread 徐涛
Hi Expert,
Usually when we write Flink-SQL program, usually we need to use 
multiple tables to get the final result, this is due to sometimes it is not 
possible to implement complicated logic in one SQL, sometimes due to the 
clarity of logic. For example:
create view A as 
select * from source where xxx;

create view B as 
select * from A where xxx;

create view C as
select * from B where xxx;

insert into sink
select * from C where xxx;

But when we write complicated logic, we may accomplish it step by step, 
make sure that the first step is correct, then go on with the next step. In 
batch program such as Hive or Spark, we usually write SQL like this, step by 
step.

For example:
create view A as 
select * from source where xxx; 
I want to check if the content in A is correct, if it is correct I go 
on to write another SQL. But I do not want to define a sink for each step, 
because it is not worthy just create a sink for such a “debug” step. 
So is there a solution or best practice for such a scenario? How do we 
easily debug or verify the correctness  of a Flink SQL program?  

Best
Henry

Re: Buffer stats when Back Pressure is high

2019-01-07 Thread Gagan Agrawal
Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for few.
However I just checked for all and found couple of them having queue length
of 40+ which seems to be due to skewness in data. Is there any general
guide lines on how to handle skewed data? In my case I am taking union and
then keyBy (with custom stateful Process function) on enrichment id of 2
streams (1 enrichment stream with low volume and another regular data
stream with high volume). I see that 30% of my data stream records have
same enrichment Id and hence go to same tasks which results in skewness.
Any pointers on how to handle skewness while doing keyBy would be of great
help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang  wrote:

> Hi Gagan,
>
> What flink version do you use? And have you checked the 
> buffers.inputQueueLength
> for all the related parallelism (connected with A) of B?  It may exist the
> scenario that only one parallelim B is full of inqueue buffers which back
> pressure A, and the input queue for other parallelism B is empty.
>
> Best,
> Zhijiang
>
> --
> From:Gagan Agrawal 
> Send Time:2019年1月7日(星期一) 12:06
> To:user 
> Subject:Buffer stats when Back Pressure is high
>
> Hi,
> I want to understand does any of buffer stats help in debugging /
> validating that downstream operator is performing slow when Back Pressure
> is high? Say I have A -> B operators and A shows High Back Pressure which
> indicates something wrong or not performing well on B side which is slowing
> down operator A. However when I look at buffers.inputQueueLength for
> operator B, it's 0. My understanding is that when B is processing slow,
> it's input buffer will be full of incoming messages which ultimately
> blocks/slows down upstream operator A. However it doesn't seem to be
> happening in my case. Can someone throw some light on how should different
> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
> downstream operator is performing slow?
>
> Gagan
>
>
>


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Puneet Kinra
Hi Hequn

Its a streaming job .

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng  wrote:

> Hi Puneet,
>
> The value of the registered timer should within startTime and endTime of
> your job. For example, job starts at processing time t1 and stops at
> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>
> Best, Hequn
>
> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi All
>>
>> Facing some issue with context to onTimer method in processfunction
>>
>> class TimerTest extends ProcessFunction,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public void processElement(Tuple2 arg0,
>> ProcessFunction, String>.Context ctx,
>> Collector arg2) throws Exception {
>> // TODO Auto-generated method stub
>> long parseLong = Long.parseLong(arg0.f1);
>> TimerService timerService = ctx.timerService();
>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>> }
>>
>> @Override
>> public void onTimer(long timestamp, ProcessFunction> String>, String>.OnTimerContext ctx,
>> Collector out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Executing timmer"+timestamp);
>> out.collect("Timer Testing..");
>> }
>> }
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Reducing runtime of Flink planner

2019-01-07 Thread Niklas Teichmann

Hi everybody,

I have a question concerning the planner for the Flink Table / Batch API.
At the moment I try to use a library called Cypher for Apache Flink, a  
project that tries to implement
the graph database query language Cypher on Apache Flink (CAPF,  
https://github.com/soerenreichardt/cypher-for-apache-flink).


The problem is that the planner seemingly takes a very long time to  
plan and optimize the job created by CAPF. This example job in json  
format


https://pastebin.com/J84grsjc

takes on a 24 GB data set about 20 minutes to plan and about 5 minutes  
to run the job. That seems very long for a job of this size.


Do you have any idea why this is the case?
Is there a way to give the planner hints to reduce the planning time?

Thanks in advance!
Niklas
--





Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Hequn Cheng
Hi Puneet,

The value of the registered timer should within startTime and endTime of
your job. For example, job starts at processing time t1 and stops at
processing time t2. You have to make sure t1< `parseLong + 5000` < t2.

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi All
>
> Facing some issue with context to onTimer method in processfunction
>
> class TimerTest extends ProcessFunction,String>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> @Override
> public void processElement(Tuple2 arg0,
> ProcessFunction, String>.Context ctx,
> Collector arg2) throws Exception {
> // TODO Auto-generated method stub
> long parseLong = Long.parseLong(arg0.f1);
> TimerService timerService = ctx.timerService();
> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
> }
>
> @Override
> public void onTimer(long timestamp, ProcessFunction String>, String>.OnTimerContext ctx,
> Collector out) throws Exception {
> // TODO Auto-generated method stub
> super.onTimer(timestamp, ctx, out);
> System.out.println("Executing timmer"+timestamp);
> out.collect("Timer Testing..");
> }
> }
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: Unable to restore the checkpoint on restarting the application!!

2019-01-07 Thread Till Rohrmann
Hi Puneet,

if context.isRestored returns false, then Flink did not resume from a
checkpoint/savepoint. Please make sure that you specify the correct path
the an existing checkpoint.

Cheers,
Till

On Mon, Jan 7, 2019 at 11:04 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi Till
>
> Its Working for me know ,but *context.isRestored() **is always returning
> false.*
>
> On Fri, Jan 4, 2019 at 7:42 PM Till Rohrmann  wrote:
>
>> When starting a job from within the IDE using the LocalEnvironment, it is
>> not possible to specify a checkpoint from which to resume. That's why your
>> recovered state is empty. Flink won't automatically pick up the latest
>> checkpoint it finds in some checkpoint directory.
>>
>> You can test it though by starting a local standalone cluster and submit
>> the job via bin/flink run -s  job.jar [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#restore-a-savepoint
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 4, 2019 at 2:49 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> The List it returns is blank
>>>
>>> On Fri, Jan 4, 2019 at 7:15 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Puneet,

 what exactly is the problem when you try to resume from a checkpoint?

 Cheers,
 Till

 On Fri, Jan 4, 2019 at 2:31 PM Puneet Kinra <
 puneet.ki...@customercentria.com> wrote:

> Hi All
>
> I am creating a poc where i am trying the out of box feature of flink
> for managed state of operator . I am able to create the checkpoint
> while running my app in eclipse but when i am trying to restart the app . 
> I
> am unable to restore
> the state.
>
> Please find attached below snippet.
>
> step followed
> 1) ran the application that generate tuple automatically.
> 2) check-pointing is triggering as  it configured.(able to see the
> data being written in files)
> 3) stopped the app in eclipse
> 4) restart the application (unable to restore)
>
>
>
> --
> *Cheers *
>
> *Puneet*
>

>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-07 Thread Till Rohrmann
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it
locally and checked that the timeout is correctly registered in Netty's
AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it
could indicate that there is some long lasting or blocking operation being
executed by the threads.

How does the job submission and cluster configuration work with AthenaX?
Will the platform spawn for each job a new Flink cluster for which you can
specify the cluster configuration?

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java#L102
[2]
https://github.com/netty/netty/blob/netty-4.0.27.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L207

Cheers,
Till

On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng  wrote:

> Hi Till,
>
> Thanks for your reply and help on this issue.
>
> I increased taskmanager.network.netty.client.connectTimeoutSec to 1200
> which is 20 minutes. But it seems the connection not respects this timeout.
> In addition, I increase both taskmanager.network.request-backoff.max
> and taskmanager.registration.max-backoff to 20min.
>
> One thing I found is helpful to some extent is increasing
> the taskmanager.network.netty.server.numThreads. I increase it to 128
> threads, it can succeed sometimes. But keep increasing it doesn't solve the
> problem. We have 100 parallel intermediate results, so there are too many
> partition requests. I think that's why it timeout. The solution should let
> the connection timeout increase. But I think there is some issue that
> connection doesn't respect the timeout config.
>
> We will definitely try the latest flink version. But at Uber, there is a
> team who is responsible to provide a platform with Flink. They will upgrade
> it at the end of this Month. Meanwhile, I would like to ask some help to
> investigate how to increase the connection timeout and make it respected.
>
> Thanks,
> Wenrui
>
> On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann  wrote:
>
>> Hi Wenrui,
>>
>> from the logs I cannot spot anything suspicious. Which configuration
>> parameters have you changed exactly? Does the JobManager log contain
>> anything suspicious?
>>
>> The current Flink version changed quite a bit wrt 1.4. Thus, it might be
>> worth a try to run the job with the latest Flink version.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng  wrote:
>>
>>> Hi,
>>>
>>> I consistently get connection timeout issue when creating
>>> partitionRequestClient in flink 1.4. I tried to ping from the connecting
>>> host to the connected host, but the ping latency is less than 0.1 ms
>>> consistently. So it's probably not due to the cluster status. I also tried
>>> increase max backoff, nettowrk timeout and some other setting, it doesn't
>>> help.
>>>
>>> I enabled the debug log of flink but not find any suspicious or useful
>>> information to help me fix the issue. Here is the link
>>> 
>>> of the jobManager and taskManager logs. The connecting host is the host
>>> which throw the exception. The connected host is the host the connecting
>>> host try to request partition from.
>>>
>>> Since our platform is not up to date yet, the flink version I used in
>>> this is 1.4. But I noticed that there is not much change of these code on
>>> the Master branch. Any help will be appreciated.
>>>
>>> Here is stack trace of the exception
>>>
>>> from RUNNING to FAILED.
>>> java.io.IOException: Connecting the channel failed: Connecting to remote
>>> task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This
>>> might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
>>> at
>>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)

Re: Unable to restore the checkpoint on restarting the application!!

2019-01-07 Thread Puneet Kinra
Hi Till

Its Working for me know ,but *context.isRestored() **is always returning
false.*

On Fri, Jan 4, 2019 at 7:42 PM Till Rohrmann  wrote:

> When starting a job from within the IDE using the LocalEnvironment, it is
> not possible to specify a checkpoint from which to resume. That's why your
> recovered state is empty. Flink won't automatically pick up the latest
> checkpoint it finds in some checkpoint directory.
>
> You can test it though by starting a local standalone cluster and submit
> the job via bin/flink run -s  job.jar [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#restore-a-savepoint
>
> Cheers,
> Till
>
> On Fri, Jan 4, 2019 at 2:49 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> The List it returns is blank
>>
>> On Fri, Jan 4, 2019 at 7:15 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Puneet,
>>>
>>> what exactly is the problem when you try to resume from a checkpoint?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jan 4, 2019 at 2:31 PM Puneet Kinra <
>>> puneet.ki...@customercentria.com> wrote:
>>>
 Hi All

 I am creating a poc where i am trying the out of box feature of flink
 for managed state of operator . I am able to create the checkpoint
 while running my app in eclipse but when i am trying to restart the app . I
 am unable to restore
 the state.

 Please find attached below snippet.

 step followed
 1) ran the application that generate tuple automatically.
 2) check-pointing is triggering as  it configured.(able to see the data
 being written in files)
 3) stopped the app in eclipse
 4) restart the application (unable to restore)



 --
 *Cheers *

 *Puneet*

>>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Buffer stats when Back Pressure is high

2019-01-07 Thread zhijiang
Hi Gagan,

What flink version do you use? And have you checked the 
buffers.inputQueueLength for all the related parallelism (connected with A) of 
B?  It may exist the scenario that only one parallelim B is full of inqueue 
buffers which back pressure A, and the input queue for other parallelism B is 
empty.

Best,
Zhijiang


--
From:Gagan Agrawal 
Send Time:2019年1月7日(星期一) 12:06
To:user 
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating 
that downstream operator is performing slow when Back Pressure is high? Say I 
have A -> B operators and A shows High Back Pressure which indicates something 
wrong or not performing well on B side which is slowing down operator A. 
However when I look at buffers.inputQueueLength for operator B, it's 0. My 
understanding is that when B is processing slow, it's input buffer will be full 
of incoming messages which ultimately blocks/slows down upstream operator A. 
However it doesn't seem to be happening in my case. Can someone throw some 
light on how should different stats around buffers (e.g buffers.inPoolUsage, 
buffers.inputQueueLength, numBuffersInLocalPerSecond, 
numBuffersInRemotePerSecond) look like when downstream operator is performing 
slow?

Gagan



onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Puneet Kinra
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction,String>{

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2 arg0,
ProcessFunction, String>.Context ctx,
Collector arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction,
String>.OnTimerContext ctx,
Collector out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Building Flink from source according to vendor-specific version but causes protobuf conflict

2019-01-07 Thread Wei Sun
Hi guys,


Good day.


I rebuilt flink from the source and specified the vendor specific Hadoop 
version. It works well when i just submit a streaming application  without 
'-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 
-ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


But if i add the '-d'(--detached) option,  a 
'org.apache.flink.client.deployment.ClusterDeploymentException' will be thrown 
out to the CLI. Just as:
bin/flink run -d -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 
-ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter 
./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf


Exception 
start
 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1544777537685_0068 failed 2 
times due to AM Container for appattempt_1544777537685_0068_02 exited with  
exitCode: 1
For more detailed output, check application tracking 
page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then,
 click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
application
2019-01-07 17:08:55,471 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deleting files 
in 

---End--EndEnd-


My cluster has enable the log aggregation, so I executed the following command:
yarn logs -applicationId application_1544777537685_0068, the detail about the 
log shows that
-Exception 
start--Exception 
start---

Passing vm options

2019-01-07 Thread Avi Levi
Hi ,
I am trying to pass some vm options e.g
bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar -Dflink.stateDir=file:///tmp/
-Dkafka.bootstrap.servers="localhost:9092"
-Dkafka.security.ssl.enabled=false
but it doesn't seem to override the values in application.conf . Am I
missing something?
BTW is it possible to pass config file using -Dcofig.file ?

BR
Avi