Re: Passing vm options

2019-01-08 Thread Avi Levi
Got it. Thanks

On Mon, Jan 7, 2019 at 5:32 PM Dominik Wosiński  wrote:

> Hey,
> AFAIK, Flink supports dynamic properties currently only on YARN and not
> really in standalone mode.
> If You are using YARN it should indeed be possible to set such
> configuration. If not, then I am afraid it is not possible.
>
> Best Regards,
> Dom.
>
>
> pon., 7 sty 2019 o 09:01 Avi Levi  napisał(a):
>
>> Hi ,
>> I am trying to pass some vm options e.g
>> bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar
>> -Dflink.stateDir=file:///tmp/ -Dkafka.bootstrap.servers="localhost:9092"
>> -Dkafka.security.ssl.enabled=false
>> but it doesn't seem to override the values in application.conf . Am I
>> missing something?
>> BTW is it possible to pass config file using -Dcofig.file ?
>>
>> BR
>> Avi
>>
>


Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi Chesnay,

I do not want to store metric counter in reference variable because I want
to create metric counter for every key of keyed stream.

There can be n number of keys and I do not want to have n number of
references.

On Tue, 8 Jan, 2019, 11:01 PM Chesnay Schepler  What you're trying to do is not possible. Even if you close the group *it
> still exists*, and is returned by subsequent calls to addGroup("mygroup").
> However since it is closed all registration calls will be ignored, hence
> why the value isn't updating.
>
> You can only update a metric by storing a reference to it in your function.
> Why do you want to avoid the member variable?
>
> On 08.01.2019 17:24, Gaurav Luthra wrote:
>
> Hi Chesnay,
>
> If removing the metrics is not possible from Flink GUI, while the job is
> running.
> Then kindly tell me how to update a metric counter.
>
> Explaination:
> Suppose I created a metric Counter with key "chesnay" and incremented the
> counter to 20, by code mentioned below.
> getRuntimeContext().getMetricGroup().counter("chesnay").inc(20);
>
> /Note: I am not assigning this counter to any local/member variable as I do
> not want to keep state in my job./
>
> Now, after some time, If I want to update the value of "chesnay" metric
> counter to 60 and I am not aware about the old state (which is 20).
>
> So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60);
>
> Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a
> WARN log something like this, "same name can be used, and behavior is
> undefined".
>
> Now, how to update the "chesnay" metric Group if I do not want to keep the
> state in my Job???
>
> Thats why, I though of creating user scoped metric group and thought of
> closing that group to remove the metric counters and create new metrics
> every time, when I want to update it.
>
> Hope you understood my problem.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: The way to write a UDF with generic type

2019-01-08 Thread yinhua.dai
Get it, thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Subtask much slower than the others when creating checkpoints

2019-01-08 Thread Till Rohrmann
Hi Bruno,

there are multiple reasons why one of the subtasks can take longer for
checkpointing. It looks as if there is not much data skew since the state
sizes are relatively equal. It also looks as if the individual tasks all
start at the same time with the checkpointing which indicates that there
mustn't be a lot of back-pressure in the DAG (or all tasks were equally
back-pressured). This narrows the problem cause down to the asynchronous
write operation. One potential problem could be if the external system to
which you write your checkpoint data has some kind of I/O limit/quota.
Maybe the sum of write accesses deplete the maximum quota you have. You
could try whether running the job with a lower parallelism solves the
problems.

For further debugging it could be helpful to get access to the logs of the
JobManager and the TaskManagers on DEBUG log level. It could also be
helpful to learn which state backend you are using.

Cheers,
Till

On Tue, Jan 8, 2019 at 12:52 PM Bruno Aranda  wrote:

> Hi,
>
> We are using Flink 1.6.1 at the moment and we have a streaming job
> configured to create a checkpoint every 10 seconds. Looking at the
> checkpointing times in the UI, we can see that one subtask is much slower
> creating the endpoint, at least in its "End to End Duration", and seems
> caused by a longer "Checkpoint Duration (Async)".
>
> For instance, in the attach screenshot, while most of the subtasks take
> half a second, one (and it is always one) takes 2 seconds.
>
> But we have worse problems. We have seen cases where the checkpoint times
> out for one tasks, while most take one second, the outlier takes more than
> 5 minutes (which is the max time we allow for a checkpoint). This can
> happen if there is back pressure. We only allow one checkpoint at a time as
> well.
>
> Why could one subtask take more time? This jobs read from kafka partitions
> and hash by key, and we don't see any major data skew between the
> partitions. Does one partition do more work?
>
> We do have a cluster of 20 machines, in EMR, with TMs that have multiple
> slots (in legacy mode).
>
> Is this something that could have been fixed in a more recent version?
>
> Thanks for any insight!
>
> Bruno
>
>
>


Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Chesnay Schepler
What you're trying to do is not possible. Even if you close the group 
/it still exists/, and is returned by subsequent calls to 
addGroup("mygroup").
However since it is closed all registration calls will be ignored, hence 
why the value isn't updating.


You can only update a metric by storing a reference to it in your function.
Why do you want to avoid the member variable?

On 08.01.2019 17:24, Gaurav Luthra wrote:

Hi Chesnay,

If removing the metrics is not possible from Flink GUI, while the job is
running.
Then kindly tell me how to update a metric counter.

Explaination:
Suppose I created a metric Counter with key "chesnay" and incremented the
counter to 20, by code mentioned below.
getRuntimeContext().getMetricGroup().counter("chesnay").inc(20);

/Note: I am not assigning this counter to any local/member variable as I do
not want to keep state in my job./

Now, after some time, If I want to update the value of "chesnay" metric
counter to 60 and I am not aware about the old state (which is 20).

So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60);

Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a
WARN log something like this, "same name can be used, and behavior is
undefined".

Now, how to update the "chesnay" metric Group if I do not want to keep the
state in my Job???

Thats why, I though of creating user scoped metric group and thought of
closing that group to remove the metric counters and create new metrics
every time, when I want to update it.

Hope you understood my problem.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi Chesnay,

If removing the metrics is not possible from Flink GUI, while the job is
running.
Then kindly tell me how to update a metric counter.

Explaination:
Suppose I created a metric Counter with key "chesnay" and incremented the
counter to 20, by code mentioned below.
getRuntimeContext().getMetricGroup().counter("chesnay").inc(20);

/Note: I am not assigning this counter to any local/member variable as I do
not want to keep state in my job./

Now, after some time, If I want to update the value of "chesnay" metric
counter to 60 and I am not aware about the old state (which is 20).

So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60);

Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a
WARN log something like this, "same name can be used, and behavior is
undefined".

Now, how to update the "chesnay" metric Group if I do not want to keep the
state in my Job???

Thats why, I though of creating user scoped metric group and thought of
closing that group to remove the metric counters and create new metrics
every time, when I want to update it.

Hope you understood my problem.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Chesnay Schepler
Metrics for a given job will be available in the GUI until the Job has 
finished.


On 08.01.2019 17:08, Gaurav Luthra wrote:

Hi,

I am using ProcessWindowFunction, and in process() function I am 
adding user scoped Group as mentioned below.
MetricGroup myMetricGroup= 
getRuntimeContext().getMetricGroup().addGroup("myGroup")


Now, I am creating counter metrics using my myMetricGroup, and I am 
able to see these counters in Flink GUI.

But when I call close() like mentioned below.
((AbstractMetricGroup) myMetricGroup).close();

Even then my counter metrics are not getting removed from flink GUI.

Kindly Guide how to close user scoped metric group (myMetricGroup in 
my case) so that all the counter metrics created using myMetricGroup 
shall be removed from Flink GUI.



Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206





Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi,

I am using ProcessWindowFunction, and in process() function I am adding
user scoped Group as mentioned below.
MetricGroup myMetricGroup = getRuntimeContext().getMetricGroup().addGroup(
"myGroup")

Now, I am creating counter metrics using my myMetricGroup, and I am able to
see these counters in Flink GUI.
But when I call close() like mentioned below.
((AbstractMetricGroup) myMetricGroup).close();

Even then my counter metrics are not getting removed from flink GUI.

Kindly Guide how to close user scoped metric group (myMetricGroup in my
case) so that all the counter metrics created using myMetricGroup shall be
removed from Flink GUI.


Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-08 Thread Puneet Kinra
Sure, I will do that.

On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng  wrote:

> Hi Puneet,
>
> Can you explain it in more detail? Do you mean the job is finished before
> you call ctx.timeservice()?
> Maybe you have to let your source running for a longer time.
>
> It's better to show us the whole pipeline of your job. For example, write
> a sample code(or provide a git link) that can reproduce your problem easily.
>
> Best, Hequn
>
>
> On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi hequan
>>
>> Weird behaviour when i m calling ctx.timeservice() function is getting
>> exited even not throwing error
>>
>> On Tuesday, January 8, 2019, Hequn Cheng  wrote:
>>
>>> Hi puneet,
>>>
>>> Could you print `parseLong + 5000` and
>>> `ctx.timerService().currentProcessingTime()` out and check the value?
>>> I know it is a streaming program. What I mean is the timer you have
>>> registered is not within the interval of your job, so the timer has not
>>> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
>>> 1000(very big).
>>>
>>> Best, Hequn
>>>
>>>
>>> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <
>>> puneet.ki...@customercentria.com> wrote:
>>>
 I checked the same the function is getting exited when i am calling
 ctx.getTimeservice () function.

 On Mon, Jan 7, 2019 at 10:27 PM Timo Walther 
 wrote:

> Hi Puneet,
>
> maybe you can show or explain us a bit more about your pipeline. From
> what I see your ProcessFunction looks correct. Are you sure the 
> registering
> takes place?
>
> Regards,
> Timo
>
> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>
> Hi Hequn
>
> Its a streaming job .
>
> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng 
> wrote:
>
>> Hi Puneet,
>>
>> The value of the registered timer should within startTime and endTime
>> of your job. For example, job starts at processing time t1 and stops at
>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>>
>> Best, Hequn
>>
>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> Hi All
>>>
>>> Facing some issue with context to onTimer method in processfunction
>>>
>>> class TimerTest extends
>>> ProcessFunction,String>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>>
>>> @Override
>>> public void processElement(Tuple2 arg0,
>>> ProcessFunction, String>.Context ctx,
>>> Collector arg2) throws Exception {
>>> // TODO Auto-generated method stub
>>> long parseLong = Long.parseLong(arg0.f1);
>>> TimerService timerService = ctx.timerService();
>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>>> }
>>>
>>> @Override
>>> public void onTimer(long timestamp, ProcessFunction>> String>, String>.OnTimerContext ctx,
>>> Collector out) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.onTimer(timestamp, ctx, out);
>>> System.out.println("Executing timmer"+timestamp);
>>> out.collect("Timer Testing..");
>>> }
>>> }
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>

 --
 *Cheers *

 *Puneet Kinra*

 *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
 *

 *e-mail :puneet.ki...@customercentria.com
 *



>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-08 Thread Till Rohrmann
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not
sure whether this is so different. Could it be that your network is
overloaded or not very reliable? Have you tried running your Flink job
outside of AthenaX?

Cheers,
Till

On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:

> Hi Till,
>
> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
> decrease the total parallel the timeout issue can be avoided. But we do
> need that amount of taskManagers to process data. In addition, once I
> increase the netty server threads to 128, the error is changed to to
> following error. It seems the cause is different. Could you help take a
> look?
>
> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
> java.io.IOException: Connecting the channel failed: Connecting to remote
> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
> at
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
> at
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
> has failed. This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 common frames omitted
> Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/
> 10.70.129.13:39466
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> 

Re: onTimer function is not getting executed and job is marked as finished.

2019-01-08 Thread Hequn Cheng
Hi Puneet,

Can you explain it in more detail? Do you mean the job is finished before
you call ctx.timeservice()?
Maybe you have to let your source running for a longer time.

It's better to show us the whole pipeline of your job. For example, write
a sample code(or provide a git link) that can reproduce your problem easily.

Best, Hequn


On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi hequan
>
> Weird behaviour when i m calling ctx.timeservice() function is getting
> exited even not throwing error
>
> On Tuesday, January 8, 2019, Hequn Cheng  wrote:
>
>> Hi puneet,
>>
>> Could you print `parseLong + 5000` and
>> `ctx.timerService().currentProcessingTime()` out and check the value?
>> I know it is a streaming program. What I mean is the timer you have
>> registered is not within the interval of your job, so the timer has not
>> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
>> 1000(very big).
>>
>> Best, Hequn
>>
>>
>> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> I checked the same the function is getting exited when i am calling
>>> ctx.getTimeservice () function.
>>>
>>> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:
>>>
 Hi Puneet,

 maybe you can show or explain us a bit more about your pipeline. From
 what I see your ProcessFunction looks correct. Are you sure the registering
 takes place?

 Regards,
 Timo

 Am 07.01.19 um 14:15 schrieb Puneet Kinra:

 Hi Hequn

 Its a streaming job .

 On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng 
 wrote:

> Hi Puneet,
>
> The value of the registered timer should within startTime and endTime
> of your job. For example, job starts at processing time t1 and stops at
> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>
> Best, Hequn
>
> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi All
>>
>> Facing some issue with context to onTimer method in processfunction
>>
>> class TimerTest extends ProcessFunction,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public void processElement(Tuple2 arg0,
>> ProcessFunction, String>.Context ctx,
>> Collector arg2) throws Exception {
>> // TODO Auto-generated method stub
>> long parseLong = Long.parseLong(arg0.f1);
>> TimerService timerService = ctx.timerService();
>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>> }
>>
>> @Override
>> public void onTimer(long timestamp, ProcessFunction> String>, String>.OnTimerContext ctx,
>> Collector out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Executing timmer"+timestamp);
>> out.collect("Timer Testing..");
>> }
>> }
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>

 --
 *Cheers *

 *Puneet Kinra*

 *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
 *

 *e-mail :puneet.ki...@customercentria.com
 *




>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>


Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-08 Thread Hequn Cheng
Hi,

A print user-defined table sink is helpful. I think a print user-defined
UDF is another workaround.
Hope this helps.

Best, Hequn

On Tue, Jan 8, 2019 at 1:45 PM yinhua.dai  wrote:

> In our case, we wrote a console table sink which print everything on the
> console, and use "insert into" to write the interim result to console.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-08 Thread Wenrui Meng
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we
decrease the total parallel the timeout issue can be avoided. But we do
need that amount of taskManagers to process data. In addition, once I
increase the netty server threads to 128, the error is changed to to
following error. It seems the cause is different. Could you help take a
look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote
task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might
indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
at
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
at
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has
failed. This might indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/
10.70.129.13:39466
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
... 6 common frames omitted


Thanks,
Wenrui

On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann  wrote:

> Hi Wenrui,
>
> the code to set the connect timeout looks ok to me [1]. I also tested it
> locally and checked that the 

Subtask much slower than the others when creating checkpoints

2019-01-08 Thread Bruno Aranda
Hi,

We are using Flink 1.6.1 at the moment and we have a streaming job
configured to create a checkpoint every 10 seconds. Looking at the
checkpointing times in the UI, we can see that one subtask is much slower
creating the endpoint, at least in its "End to End Duration", and seems
caused by a longer "Checkpoint Duration (Async)".

For instance, in the attach screenshot, while most of the subtasks take
half a second, one (and it is always one) takes 2 seconds.

But we have worse problems. We have seen cases where the checkpoint times
out for one tasks, while most take one second, the outlier takes more than
5 minutes (which is the max time we allow for a checkpoint). This can
happen if there is back pressure. We only allow one checkpoint at a time as
well.

Why could one subtask take more time? This jobs read from kafka partitions
and hash by key, and we don't see any major data skew between the
partitions. Does one partition do more work?

We do have a cluster of 20 machines, in EMR, with TMs that have multiple
slots (in legacy mode).

Is this something that could have been fixed in a more recent version?

Thanks for any insight!

Bruno


Re: The way to write a UDF with generic type

2019-01-08 Thread Timo Walther
Currently, this functionality is hard-coded in the aggregation 
translation. Namely in 
`org.apache.flink.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions` 
[1].


Timo

[1] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala


Am 08.01.19 um 06:41 schrieb yinhua.dai:

Hi Timo,

Can you let me know how the build-in "MAX" function able to support
different types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Buffer stats when Back Pressure is high

2019-01-08 Thread Gagan Agrawal
Thanks Timo for suggested solution. Will go with idea of artificial key for
our use case.

Gagan

On Mon, Jan 7, 2019 at 10:21 PM Timo Walther  wrote:

> Hi Gagan,
>
> a typical solution to such a problem is to introduce an artifical key
> (enrichment id + some additional suffix), you can then keyBy on this
> artificial key and thus spread the workload more evenly. Of course you need
> to make sure that records of the second stream are duplicated to all
> operators with the same artificial key.
>
> Depending on the frequency of the second stream, it might also worth to
> use a broadcast join that distributes the second stream to all operators
> such that all operators can perform the enrichment step in a round robin
> fashion.
>
> Regards,
> Timo
>
> Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
>
> Flink Version is 1.7.
> Thanks Zhijiang for your pointer. Initially I was checking only for few.
> However I just checked for all and found couple of them having queue length
> of 40+ which seems to be due to skewness in data. Is there any general
> guide lines on how to handle skewed data? In my case I am taking union and
> then keyBy (with custom stateful Process function) on enrichment id of 2
> streams (1 enrichment stream with low volume and another regular data
> stream with high volume). I see that 30% of my data stream records have
> same enrichment Id and hence go to same tasks which results in skewness.
> Any pointers on how to handle skewness while doing keyBy would be of great
> help.
>
> Gagan
>
> On Mon, Jan 7, 2019 at 3:25 PM zhijiang 
> wrote:
>
>> Hi Gagan,
>>
>> What flink version do you use? And have you checked the 
>> buffers.inputQueueLength
>> for all the related parallelism (connected with A) of B?  It may exist the
>> scenario that only one parallelim B is full of inqueue buffers which back
>> pressure A, and the input queue for other parallelism B is empty.
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Gagan Agrawal 
>> Send Time:2019年1月7日(星期一) 12:06
>> To:user 
>> Subject:Buffer stats when Back Pressure is high
>>
>> Hi,
>> I want to understand does any of buffer stats help in debugging /
>> validating that downstream operator is performing slow when Back Pressure
>> is high? Say I have A -> B operators and A shows High Back Pressure which
>> indicates something wrong or not performing well on B side which is slowing
>> down operator A. However when I look at buffers.inputQueueLength for
>> operator B, it's 0. My understanding is that when B is processing slow,
>> it's input buffer will be full of incoming messages which ultimately
>> blocks/slows down upstream operator A. However it doesn't seem to be
>> happening in my case. Can someone throw some light on how should different
>> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
>> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
>> downstream operator is performing slow?
>>
>> Gagan
>>
>>
>>
>