Re: 远程提交代码到Flink集群

2019-03-28 Thread Lifei Chen
有一个小巧的go cli, 支持直接部署jar包到flink manager上。

https://github.com/ing-bank/flink-deployer

希望能帮到你!

Kaibo Zhou  于2019年3月29日周五 上午11:08写道:

> 可以用 flink 提供的 Restful API 接口,upload 上传 jar 包然后 run。
>
> 参考:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
> 和 https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
> 文档的介绍
>
> 文报 <1010467...@qq.com> 于2019年3月28日周四 下午9:06写道:
>
> > 各位好!
> >
> >
> 请教一下各位,在本地开发完代码后,怎么样可以将编写好的代码直接提交到Flink集群上运行?(想做Flink任务的自动化,避免每次开发完成后都需要打jar包提交到web页面上)
>


Does flink configuration support configed by environment variables?

2019-03-28 Thread Lifei Chen
Hi guys,

I am using flink 1.7.2 deployed by kubernetes,  and I want to change the
configurations about flink,  for example customize
`taskmanager.heap.size`.

Does flink support using environment variables to override configurations
in `conf/flink-conf.yaml` ?


Re: 远程提交代码到Flink集群

2019-03-28 Thread Kaibo Zhou
可以用 flink 提供的 Restful API 接口,upload 上传 jar 包然后 run。

参考:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
和 https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
文档的介绍

文报 <1010467...@qq.com> 于2019年3月28日周四 下午9:06写道:

> 各位好!
>
>  
> 请教一下各位,在本地开发完代码后,怎么样可以将编写好的代码直接提交到Flink集群上运行?(想做Flink任务的自动化,避免每次开发完成后都需要打jar包提交到web页面上)


请教一下Blink资源分配问题

2019-03-28 Thread 邓成刚【qq】
请教一下Blink资源分配问题:
blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are fulfilled.
调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
但是我的TASK SLOTS有配 48,没有其它任务,
按理説没有资源问题啊,集群配置情况:

其它的都是默认配置:

taskmanager.numberOfTaskSlots: 24

jobmanager.heap.size: 20480m

# The heap size for the TaskManager JVM

taskmanager.heap.size: 40960m


服务器 2 台:每台 48核,256G




Re: blink开源版本维表关联时开启缓存方式

2019-03-28 Thread Kurt Young
Hi,

Blink开源的时候把Cache的实现暂时拿掉了,你可以根据自己的需要自己实现一个cache。

Best,
Kurt


On Wed, Mar 27, 2019 at 4:44 PM 苏 欣  wrote:

> 我在ppt里面看到这些内容,但是在开源的blink里面没有找到相关的配置,请问各位老师应该如何开启缓存策略?
>
>
>
> 发送自 Windows 10 版邮件 应用
>
>
>


Re: AskTimeoutException - Cannot deploy task

2019-03-28 Thread Avi Levi
Hi All,
Following my previous mail below I see the exception below.
I really appreciate any help here
Attached is log files
Looking at the logs we see this exception all around :

2019-03-28 23:51:58,460 WARN  org.apache.kafka.common.network.Selector
 - Unexpected error from
kafka-3.c.mako.internal/10.157.9.103; closing connection
java.lang.IllegalStateException: Buffer overflow when available data
size (16384) >= application buffer size (16384)
at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:470)
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)



-- Forwarded message -
From: Avi Levi 
Date: Thu, Mar 28, 2019 at 11:03 PM
Subject: AskTimeoutException - Cannot deploy task
To: user 


Hi,
I see the following exceptions, will really appreciate any help on that

Thanks

Avi


This is the first one (out of three) :


java.lang.Exception: Cannot deploy task KeyedProcess -> Sink: Unnamed
(3/100) (2c9646634afe1488659da404e92697b0) - TaskManager
(container_e03_1553795623823_0001_01_03 @
dataproc-cluster-w-14.XXX (dataPort=45777)) not responding after a
rpcTimeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:624)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://fl...@dataproc-cluster-w-14.c.:38047/user/taskmanager_0#-1921747025]]
after [1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
... 1 more


and this is the second:

rg.apache.flink.util.FlinkException: The assigned slot
container_e03_1553795623823_0001_01_44_3 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Vinod Mehra
btw the max DAY window that is allowed is 99 days. After that it blows up
here:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/SqlIntervalQualifier.java#L371

"SQL validation failed. From line 12, column 19 to line 12, column 36:
Interval field value 100 exceeds precision of DAY(2) field"

Resetting things based on larger windows (month, quarter, year) can be
quite useful. Is there a practical limitation with Flink (state size blows
up?) for not supporting such large windows?

- Vinod


On Thu, Mar 28, 2019 at 3:24 PM Vinod Mehra  wrote:

> Dawid,
>
> After the above change my SQL (that uses TUMBLE(rowtime, INTERVAL '1'
> MONTH)) fails with an error now:
>
> *(testing with org.apache.flink:flink-table_2.11:jar:1.7.1:compile now)*
> org.apache.flink.table.api.TableException: *Only constant window
> intervals with millisecond resolution are supported*.
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:73)
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
> at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
> at
> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
> at
> org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
> at
> org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)
>
> The same exact syntax works fine for DAY intervals. For example:
> TUMBLE(rowtime, INTERVAL '30' DAY)
>
> Is the same syntax for MONTH / YEAR intervals not supported?
> TUMBLE(rowtime, INTERVAL '1' MONTH)
> TUMBLE(rowtime, INTERVAL '1' YEAR)
>
> Thanks,
> Vinod
>
> On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz 
> wrote:
>
>> It should be fixed since version 1.6.3.
>> Best,
>> Dawid
>>
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>>
>>
>> On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:
>>
>>> Hi All!
>>>
>>> We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile
>>>
>>> SELECT
>>>   COALESCE(user_id, -1) AS user_id,
>>>   count(id) AS count_per_window,
>>>   sum(amount) AS charge_amount_per_window,
>>>   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
>>>   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
>>> FROM
>>>   event_charge_processed
>>> WHERE capture=true
>>> AND COALESCE(user_id, -1) <> -1
>>> GROUP BY
>>>   TUMBLE(rowtime, INTERVAL '2' YEAR),
>>>   COALESCE(user_id, -1)
>>>
>>> For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
>>> MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
>>> aggregations.
>>>
>>> I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
>>> MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
>>> there (?).
>>>
>>> Is this a known issue? Has it been fixed in later versions?
>>>
>>> Thanks,
>>> Vinod
>>>
>>>


Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Vinod Mehra
Dawid,

After the above change my SQL (that uses TUMBLE(rowtime, INTERVAL '1'
MONTH)) fails with an error now:

*(testing with org.apache.flink:flink-table_2.11:jar:1.7.1:compile now)*
org.apache.flink.table.api.TableException: *Only constant window intervals
with millisecond resolution are supported*.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:73)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at
org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at
org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at
org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)

The same exact syntax works fine for DAY intervals. For example:
TUMBLE(rowtime, INTERVAL '30' DAY)

Is the same syntax for MONTH / YEAR intervals not supported?
TUMBLE(rowtime, INTERVAL '1' MONTH)
TUMBLE(rowtime, INTERVAL '1' YEAR)

Thanks,
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz 
wrote:

> It should be fixed since version 1.6.3.
> Best,
> Dawid
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>
>
> On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:
>
>> Hi All!
>>
>> We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile
>>
>> SELECT
>>   COALESCE(user_id, -1) AS user_id,
>>   count(id) AS count_per_window,
>>   sum(amount) AS charge_amount_per_window,
>>   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
>>   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
>> FROM
>>   event_charge_processed
>> WHERE capture=true
>> AND COALESCE(user_id, -1) <> -1
>> GROUP BY
>>   TUMBLE(rowtime, INTERVAL '2' YEAR),
>>   COALESCE(user_id, -1)
>>
>> For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
>> MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
>> aggregations.
>>
>> I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
>> MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
>> there (?).
>>
>> Is this a known issue? Has it been fixed in later versions?
>>
>> Thanks,
>> Vinod
>>
>>


Re: Schema Evolution on Dynamic Schema

2019-03-28 Thread Shahar Cizer Kobrinsky
Hmm kinda stuck here. Seems like SQL Group by is translated to a
*GroupAggProcessFunction* which stores a state for every aggregation
element (thus flattening the map items for state store). Seems like there's
no way around it. Am i wrong? is there any way to evolve the map elements
when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske  wrote:

> Hi,
>
> I think this would work.
> However, you should be aware that all keys are kept forever in state
> (unless you configure idle state retention time [1]).
> This includes previous versions of keys.
>
> Also note that we are not guaranteeing savepoint compatibility across
> Flink versions yet.
> If the state of the aggregation operator changes in a later version (say
> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
>> My bad. it actually did work with
>> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
>> group by a
>>
>> do you think thats OK as a workaround? main schema should be changed that
>> way - only keys in the map
>>
>> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
>> shahar.kobrin...@gmail.com> wrote:
>>
>>> Thanks Fabian,
>>>
>>> Im thinking about how to work around that issue and one thing that came
>>> to my mind is to create a map that holds keys & values that can be edited
>>> without changing the schema, though im thinking how to implement it in
>>> Calcite.
>>> Considering the following original SQL in which "metrics" can be
>>> added/deleted/renamed
>>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>>> Group by a
>>>
>>> im looking both at json_objectagg & map to change it but it seems that
>>> json_objectagg is on a later calcite version and map doesnt work for me.
>>> Trying something like
>>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>>> metric_map
>>> group by a
>>>
>>> results with "Non-query expression encountered in illegal context"
>>> is my train of thought the right one? if so, do i have a mistake in the
>>> way im trying to implement it?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske  wrote:
>>>
 Hi,

 Restarting a changed query from a savepoint is currently not supported.
 In general this is a very difficult problem as new queries might result
 in completely different execution plans.
 The special case of adding and removing aggregates is easier to solve,
 but the schema of the stored state changes and we would need to analyze the
 previous and current query and generate compatible serializers.
 So far we did not explore this rabbit hole.

 Also, starting a different query from a savepoint can also lead to
 weird result semantics.
 I'd recommend to bootstrap the state of the new query from scatch.

 Best, Fabian



 Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
 shahar.kobrin...@gmail.com>:

> Or is it the SQL state that is incompatible.. ?
>
> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
>> Thanks Guys,
>>
>> I actually got an error now adding some fields into the select
>> statement:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>> at
>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>> backends, the new state serializer must not be incompatible.
>> at
>> 

AskTimeoutException - Cannot deploy task

2019-03-28 Thread Avi Levi
Hi,
I see the following exceptions, will really appreciate any help on that

Thanks

Avi


This is the first one (out of three) :


java.lang.Exception: Cannot deploy task KeyedProcess -> Sink: Unnamed
(3/100) (2c9646634afe1488659da404e92697b0) - TaskManager
(container_e03_1553795623823_0001_01_03 @
dataproc-cluster-w-14.XXX (dataPort=45777)) not responding after a
rpcTimeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:624)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://fl...@dataproc-cluster-w-14.c.:38047/user/taskmanager_0#-1921747025]]
after [1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
... 1 more


and this is the second:

rg.apache.flink.util.FlinkException: The assigned slot
container_e03_1553795623823_0001_01_44_3 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Third one


java.util.concurrent.TimeoutException: Heartbeat of 

Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread shkob1
Apparently the solution is to force map creating using UDF and to have the
UDF return Types.GENERIC(Map.class)
That makes them compatible and treated both as GenericType

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Vinod Mehra
Thanks Dawid! Can you please point me to a jira which tracked the fix?

Thanks!
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz 
wrote:

> It should be fixed since version 1.6.3.
> Best,
> Dawid
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>
>
> On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:
>
>> Hi All!
>>
>> We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile
>>
>> SELECT
>>   COALESCE(user_id, -1) AS user_id,
>>   count(id) AS count_per_window,
>>   sum(amount) AS charge_amount_per_window,
>>   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
>>   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
>> FROM
>>   event_charge_processed
>> WHERE capture=true
>> AND COALESCE(user_id, -1) <> -1
>> GROUP BY
>>   TUMBLE(rowtime, INTERVAL '2' YEAR),
>>   COALESCE(user_id, -1)
>>
>> For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
>> MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
>> aggregations.
>>
>> I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
>> MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
>> there (?).
>>
>> Is this a known issue? Has it been fixed in later versions?
>>
>> Thanks,
>> Vinod
>>
>>


Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Vinod Mehra
Doh! Sorry about that! :)

Thanks again!

On Thu, Mar 28, 2019 at 12:49 PM Dawid Wysakowicz 
wrote:

> I did ;) but here is the link one more time:
> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>
> On Thu, 28 Mar 2019, 20:48 Vinod Mehra,  wrote:
>
>> Thanks Dawid! Can you please point me to a jira which tracked the fix?
>>
>> Thanks!
>> Vinod
>>
>> On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz 
>> wrote:
>>
>>> It should be fixed since version 1.6.3.
>>> Best,
>>> Dawid
>>>
>>>
>>> [1]
>>> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>>>
>>>
>>> On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:
>>>
 Hi All!

 We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

 SELECT
   COALESCE(user_id, -1) AS user_id,
   count(id) AS count_per_window,
   sum(amount) AS charge_amount_per_window,
   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
 FROM
   event_charge_processed
 WHERE capture=true
 AND COALESCE(user_id, -1) <> -1
 GROUP BY
   TUMBLE(rowtime, INTERVAL '2' YEAR),
   COALESCE(user_id, -1)

 For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
 aggregations.

 I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
 MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
 there (?).

 Is this a known issue? Has it been fixed in later versions?

 Thanks,
 Vinod




Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Dawid Wysakowicz
I did ;) but here is the link one more time:
https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month

On Thu, 28 Mar 2019, 20:48 Vinod Mehra,  wrote:

> Thanks Dawid! Can you please point me to a jira which tracked the fix?
>
> Thanks!
> Vinod
>
> On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz 
> wrote:
>
>> It should be fixed since version 1.6.3.
>> Best,
>> Dawid
>>
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month
>>
>>
>> On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:
>>
>>> Hi All!
>>>
>>> We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile
>>>
>>> SELECT
>>>   COALESCE(user_id, -1) AS user_id,
>>>   count(id) AS count_per_window,
>>>   sum(amount) AS charge_amount_per_window,
>>>   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
>>>   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
>>> FROM
>>>   event_charge_processed
>>> WHERE capture=true
>>> AND COALESCE(user_id, -1) <> -1
>>> GROUP BY
>>>   TUMBLE(rowtime, INTERVAL '2' YEAR),
>>>   COALESCE(user_id, -1)
>>>
>>> For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
>>> MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
>>> aggregations.
>>>
>>> I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
>>> MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
>>> there (?).
>>>
>>> Is this a known issue? Has it been fixed in later versions?
>>>
>>> Thanks,
>>> Vinod
>>>
>>>


Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Dawid Wysakowicz
It should be fixed since version 1.6.3.
Best,
Dawid


[1]
https://issues.apache.org/jira/browse/FLINK-11017?jql=project%20%3D%20FLINK%20AND%20text%20~%20Month


On Thu, 28 Mar 2019, 19:32 Vinod Mehra,  wrote:

> Hi All!
>
> We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile
>
> SELECT
>   COALESCE(user_id, -1) AS user_id,
>   count(id) AS count_per_window,
>   sum(amount) AS charge_amount_per_window,
>   TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
>   TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
> FROM
>   event_charge_processed
> WHERE capture=true
> AND COALESCE(user_id, -1) <> -1
> GROUP BY
>   TUMBLE(rowtime, INTERVAL '2' YEAR),
>   COALESCE(user_id, -1)
>
> For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3
> MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect
> aggregations.
>
> I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
> MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
> there (?).
>
> Is this a known issue? Has it been fixed in later versions?
>
> Thanks,
> Vinod
>
>


Support for custom triggers in Table / SQL

2019-03-28 Thread Piyush Narang
Hi folks,

I’m trying to write a Flink job that computes a bunch of counters which 
requires custom triggers and I was trying to figure out the best way to express 
that.

The query looks something like this:
SELECT userId, customUDAF(...) AS counter1, customUDAF(...) AS counter2, ...
FROM (
SELECT * FROM my_kafka_stream
)
GROUP BY userId, HOP(`timestamp`, INTERVAL '6' HOUR, INTERVAL '7' DAY)

We sink this to a KV store (memcache / couchbase) for persistence.

Some of these counters end up spanning a pretty wide time window (longest is 7 
days) and if we want to keep the state tractable we have to have a pretty large 
slide interval (6 hours or greater). A requirement that some of our users have 
is for counters to be updated fairly frequently (e.g. every min) so we were 
looking at how to achieve that with the Table / SQL api. I see that this is 
possible using the custom 
triggers
 support if we were to use the Datastream api but I’m wondering if this is 
possible using the Table / SQL apis.

I did see another thread where Fabian brought up this design 
doc
 which has listed what support for emit triggers would look like (in various 
streaming platforms). Is this something that is being actively worked on? If 
not, any suggestions on how we could get the ball rolling on this? (google doc 
design + jira?)

Thanks,

-- Piyush



Re: Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Konstantin Knauf
Hi Marc,

the Kafka Producer should be able to create backpressure. Could you try to
increase max.block.ms to Long.MAX_VALUE?

The exceptions you shared for the failure case don't look like the root
causes of the problem. Could you share the full stacktraces or even full
logs for this time frame. Feel free to send these logs to me directly, if
you don't want to share them on the list.

Best,

Konstantin




On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding  wrote:

> Hi
>
> We’ve got a job producing to a Kafka sink. The Kafka topics have a
> retention of 2 weeks. When doing a complete replay, it seems like Flink
> isn’t able to back-pressure or throttle the amount of messages going to
> Kafka, causing the following error:
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has
> passed since batch creation
>
> We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka
> cluster is running version 2.1.1. The Kafka producer uses all default
> settings except from:
>
> compression.type = snappy
> max.in.flight.requests.per.connection = 1
> acks = all
> client.dns.lookup = use_all_dns_ips
>
> I tried playing around with the buffer and batch settings, increasing
> timeouts, but none seem to be what we need. Increasing the
> delivery.timeout.ms and request.timeout.ms solves the initial error, but
> causes the Flink job to fail entirely due to:
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>
> My assumption is that the Kafka producer will start blocking since it
> notices that it can't handle the batches, and Flink eventually runs out of
> buffers for the operator.
>
> What really baffles me is that the backpressure tab shows that everything
> is OK. The entire job pipeline (which reads from 4 different topics, unions
> them all and sinks towards 1 topic) pushes all the messages through to the
> sink stage, resulting in 18 million incoming stage messages, even though
> Kafka is in no way possible to keep up with this.
>
> I searched for others facing the same issue but can't find anything
> similar. I'm hoping that someone here could guide me in the right direction.
>
> Thanks in advance
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

2019-03-28 Thread Vinod Mehra
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms
…. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats
MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere
there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod


Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread Shahar Cizer Kobrinsky
Based on this discussion
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HashMap-HashSet-Serialization-Issue-td10899.html
this seems by design that HashMap/Map are handled as GenericTypes .
However that doesn't work with the query result table schema which
generates a Map type.

On Thu, Mar 28, 2019 at 10:32 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Hey Rong,
>
> I don't think this is about a UDF, i reproduce the same exception with a
> simple map['a','b'] where the Pojo has a Map property
> btw for the UDF i'm already doing it (clazz is based on the specific map
> im creating):
>
> @Override
> public TypeInformation getResultType(Class[] signature) {
> return Types.MAP(Types.STRING, TypeInformation.of(clazz));
> }
>
> The table schema looks good but looking at the PojoTypeInfo fields the Map
> field is a GenericType - this causes the exception to be thrown on
> TableEnvironment.generateRowConverterFunction
>
>
> On Thu, Mar 28, 2019 at 8:56 AM Rong Rong  wrote:
>
>> If your conversion is done using a UDF you need to override the
>> getResultType method [1] to explicitly specify the key and value type
>> information. As generic erasure will not preseve the  part
>> of your code.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions
>>
>> On Wed, Mar 27, 2019 at 10:14 AM shkob1 
>> wrote:
>>
>>> Im trying to convert a SQL query that has a select map[..] into a pojo
>>> with
>>> Map (using tableEnv.toRestractedStream )
>>> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with
>>> GenericType while the field type itself is MapTypeInfo
>>> with
>>> Map
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Result
>>> field does not match requested type. Requested:
>>> GenericType;
>>> Actual: Map
>>>
>>> Any suggestion?
>>> Shahar
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread Shahar Cizer Kobrinsky
Hey Rong,

I don't think this is about a UDF, i reproduce the same exception with a
simple map['a','b'] where the Pojo has a Map property
btw for the UDF i'm already doing it (clazz is based on the specific map im
creating):

@Override
public TypeInformation getResultType(Class[] signature) {
return Types.MAP(Types.STRING, TypeInformation.of(clazz));
}

The table schema looks good but looking at the PojoTypeInfo fields the Map
field is a GenericType - this causes the exception to be thrown on
TableEnvironment.generateRowConverterFunction


On Thu, Mar 28, 2019 at 8:56 AM Rong Rong  wrote:

> If your conversion is done using a UDF you need to override the
> getResultType method [1] to explicitly specify the key and value type
> information. As generic erasure will not preseve the  part
> of your code.
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions
>
> On Wed, Mar 27, 2019 at 10:14 AM shkob1 
> wrote:
>
>> Im trying to convert a SQL query that has a select map[..] into a pojo
>> with
>> Map (using tableEnv.toRestractedStream )
>> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with
>> GenericType while the field type itself is MapTypeInfo with
>> Map
>>
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Result
>> field does not match requested type. Requested:
>> GenericType;
>> Actual: Map
>>
>> Any suggestion?
>> Shahar
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread Rong Rong
If your conversion is done using a UDF you need to override the
getResultType method [1] to explicitly specify the key and value type
information. As generic erasure will not preseve the  part
of your code.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions

On Wed, Mar 27, 2019 at 10:14 AM shkob1  wrote:

> Im trying to convert a SQL query that has a select map[..] into a pojo with
> Map (using tableEnv.toRestractedStream )
> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with
> GenericType while the field type itself is MapTypeInfo with
> Map
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Result
> field does not match requested type. Requested: GenericType;
> Actual: Map
>
> Any suggestion?
> Shahar
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


IllegalArgumentException when trying to execute job

2019-03-28 Thread Papadopoulos, Konstantinos
Hi all,

I am trying to execute a batch job that gets a list of IDs and perform a loop 
with a number of steps during each iteration including reading from a MS SQL 
Server DB.

A sample pseudo-code of our implementation is the following:

List ids = ...

ids.foreach(
 id -> executeIteration();
)

void executeIteration {
 // 1. Read from source DB
 // 2. Perform a number of transformations
 // 3. Write data to local file
}

When I am trying to execute the aforementioned job to a local Flink cluster, 
job fails retrieving the following exception:

java.lang.IllegalArgumentException: open() failed.The query processor could not 
start the necessary thread resources for parallel query execution.
 at 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:241)
 at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.lang.Thread.run(Unknown Source)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The query processor 
could not start the necessary thread resources for parallel query execution.
 at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:216)
 at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:4869)
 at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1781)
 at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1034)
 at 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:239)
 ... 3 more

Does anyone have any idea why this is happening?

Thanks in advance,
Konstantinos


问下大家,有做好的blink的docker image镜像吗?能够共享下坐标或者dockerfile,谢谢

2019-03-28 Thread 陈韬
问下大家,有做好的blink的docker image镜像吗?能够共享下坐标或者dockerfile,谢谢

??????????????Flink????

2019-03-28 Thread ????

   
Flink???(Flinkjarweb??)

Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Marc Rooding
Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 
2 weeks. When doing a complete replay, it seems like Flink isn’t able to 
back-pressure or throttle the amount of messages going to Kafka, causing the 
following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has passed since 
batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka 
cluster is running version 2.1.1. The Kafka producer uses all default settings 
except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, 
but none seem to be what we need. Increasing the delivery.timeout.ms and 
request.timeout.ms solves the initial error, but causes the Flink job to fail 
entirely due to:

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices 
that it can't handle the batches, and Flink eventually runs out of buffers for 
the operator.

What really baffles me is that the backpressure tab shows that everything is 
OK. The entire job pipeline (which reads from 4 different topics, unions them 
all and sinks towards 1 topic) pushes all the messages through to the sink 
stage, resulting in 18 million incoming stage messages, even though Kafka is in 
no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. 
I'm hoping that someone here could guide me in the right direction.

Thanks in advance



Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
I have put the task manager of the data sink log to
https://gist.github.com/yinhua2018/7de42ff9c1738d5fdf9d99030db903e2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi Qi,

I checked the JVM heap of the sink TM is low.

I tried to read flink source code to identify where is exact the error
happen.
I think the exception happened inside DataSinkTask.invoke()

// work!
while (!this.taskCanceled && ((record = 
input.next()) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}

RemoteTransportException should be thrown from "input.next()" when InputGate
tried to read data from the upstream.
Is this really a problem for this sink TM?
I'm a little bit confused.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread qi luo
Hi Yinhua,

This looks like the TM executing the sink is down, maybe due to OOM or some 
other error. You can check the JVM heap and GC log to see if there’re any clues.

Regards,
Qi

> On Mar 28, 2019, at 7:23 PM, yinhua.dai  wrote:
> 
> Hi,
> 
> I write a single flink job with flink SQL with version 1.6.1
> I have one table source which read data from a database, and one table sink
> to output as avro format file.
> The table source has parallelism of 19, and table sink only has parallelism
> of 1.
> 
> But there is always RemoteTransportException when the job is nearly done(All
> data source has been finished, and the data sink has been running for a
> while).
> The detail error as below:
> 
> 2019-03-28 07:53:49,086 ERROR
> org.apache.flink.runtime.operators.DataSinkTask   - Error in
> user code: Connection unexpectedly closed by remote task manager
> 'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
> This might indicate that the remote task manager was lost.:  DataSink
> (com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
> (1/1)
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager
> 'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
> This might indicate that the remote task manager was lost.
>at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
>at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
>at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
>at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>at java.lang.Thread.run(Thread.java:748)
> 2019-03-28 07:53:49,440 INFO 
> com.tr.apt.sqlengine.tables.s3.AbstractFileOutputFormat   -
> FileTableSink sinked all data to : file:///tmp/shareamount.avro
> 2019-03-28 07:53:49,441 INFO  org.apache.flink.runtime.taskmanager.Task   
>  
> - DataSink
> (com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
> (1/1) (31fd3e6fdbb1576e7288e202fff69b07) switched from RUNNING to FAILED.
> 
> 
> Is the error means that the data sink failed to read all of data from some
> data source instance before the source end itself?
> 
> When I check the log of task manager (10.97.34.40:46625), it's all ok, it
> shows it successfully finished its job and receive SIGNAL 15 and then
> terminate itself.
> So how should I find out the root cause of the error?
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: questions regarding offset

2019-03-28 Thread Avi Levi
Thanks for answering. please see my comments below

On Thu, Mar 28, 2019 at 12:32 PM Dawid Wysakowicz 
wrote:

> Hi Avi,
>
> Yes, you are right. Kafka offsets are kept in state.
>
> Ad. 1 If you try to restore a state in a completely different
> environment, and offsets are no longer compatible it will most probably
> fail as it won't be able to derive up to which point we already
> processed the records.
>
So there is no way to move state between clusters ? I thought that the
offsets are managed also by job id. butI guess I was wrong

>
> Ad.2 What do you mean by stateless job? Do you mean a job with
> checkpoints disabled? If so then the job does not checkpoint kafka
> offsets. They might be committed back to Kafka based on the internal
> Kafka consumer configuration[1]. So in case of failover it will use
> given start position configuration[2].
>

By stateless I mean a job without need to persist a state but with
checkpoints enabled.

>
> Best,
>
> Dawid
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
>
> On 28/03/2019 06:51, Avi Levi wrote:
> > Hi Guys,
> > I understood that offset is kept as part of the checkpoint and
> > persisted in the state (please correct me if I'm wrong)
> >
> > 1. If I copy my persisted state to another cluster (different kafka
> > servers as well) how is the offset handled ?
> > 2. In a stateless job how is the offset managed ? since there is no
> > persistency . I mean in aspect of exactly once, recovery ...
> >
> > BR
> > Avi
>
>


Re: Flink Job 监控

2019-03-28 Thread Biao Liu
Hi, 可以了解下 RESTful API
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html

cheng  于2019年3月28日周四 下午5:08写道:

> 我们目前是用standalone 模式部署的集群,请问这个job state 有关于job是否挂掉或者重启的指标?我看官方文档好像没找到。
>
> > 在 2019年3月28日,下午4:51,浪人 <1543332...@qq.com> 写道:
> >
> > 如果是使用flink集成cluster可以监控flink的job state,如果是yarn是超脱模式可以监控yarn的状态。
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "cheng";
> > 发送时间: 2019年3月28日(星期四) 下午4:38
> > 收件人: "user-zh";
> >
> > 主题: Flink Job 监控
> >
> >
> >
> > 各位好!
> >   请教下各位,Flink Job 在生产上运行时,关于job运行状态的监控和告警一般是采用什么方案处理的?
> 比如监控job是否在正常运行,如果发现job 挂掉了 或者重启了 就进行告警。我这边有将一些metric 推到prometheus
> 但是好像没有发现关于job是否挂掉的metric。
> >   希望有做过这种方案的朋友能赐教下,谢谢了!!
>
>


RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi,

I write a single flink job with flink SQL with version 1.6.1
I have one table source which read data from a database, and one table sink
to output as avro format file.
The table source has parallelism of 19, and table sink only has parallelism
of 1.

But there is always RemoteTransportException when the job is nearly done(All
data source has been finished, and the data sink has been running for a
while).
The detail error as below:

2019-03-28 07:53:49,086 ERROR
org.apache.flink.runtime.operators.DataSinkTask   - Error in
user code: Connection unexpectedly closed by remote task manager
'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
This might indicate that the remote task manager was lost.:  DataSink
(com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
(1/1)
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager
'ip-10-97-34-40.tr-fr-nonprod.aws-int.thomsonreuters.com/10.97.34.40:46625'.
This might indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2019-03-28 07:53:49,440 INFO 
com.tr.apt.sqlengine.tables.s3.AbstractFileOutputFormat   -
FileTableSink sinked all data to : file:///tmp/shareamount.avro
2019-03-28 07:53:49,441 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- DataSink
(com.tr.apt.sqlengine.tables.s3.AvroFileTableSink$AvroOutputFormat@42d174ad)
(1/1) (31fd3e6fdbb1576e7288e202fff69b07) switched from RUNNING to FAILED.


Is the error means that the data sink failed to read all of data from some
data source instance before the source end itself?

When I check the log of task manager (10.97.34.40:46625), it's all ok, it
shows it successfully finished its job and receive SIGNAL 15 and then
terminate itself.
So how should I find out the root cause of the error?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Ufuk Celebi
Thanks Gordon. We already have 5 people watching it. :-)

On Thu, Mar 28, 2019 at 10:23 AM Tzu-Li (Gordon) Tai
 wrote:
>
> @Ufuk
>
> Yes, creating a JIRA now already to track this makes sense.
>
> I've proceeded to open one:  https://issues.apache.org/jira/browse/FLINK-12047
> Let's move any further discussions there.
>
> Cheers,
> Gordon
>
> On Thu, Mar 28, 2019 at 5:01 PM Ufuk Celebi  wrote:
>>
>> I think such a tool would be really valuable to users.
>>
>> @Gordon: What do you think about creating an umbrella ticket for this
>> and linking it in this thread? That way, it's easier to follow this
>> effort. You could also link Bravo and Seth's tool in the ticket as
>> starting points.
>>
>> – Ufuk


Re: questions regarding offset

2019-03-28 Thread Dawid Wysakowicz
Hi Avi,

Yes, you are right. Kafka offsets are kept in state.

Ad. 1 If you try to restore a state in a completely different
environment, and offsets are no longer compatible it will most probably
fail as it won't be able to derive up to which point we already
processed the records.

Ad.2 What do you mean by stateless job? Do you mean a job with
checkpoints disabled? If so then the job does not checkpoint kafka
offsets. They might be committed back to Kafka based on the internal
Kafka consumer configuration[1]. So in case of failover it will use
given start position configuration[2].

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-start-position-configuration


On 28/03/2019 06:51, Avi Levi wrote:
> Hi Guys,
> I understood that offset is kept as part of the checkpoint and
> persisted in the state (please correct me if I'm wrong)
>
> 1. If I copy my persisted state to another cluster (different kafka
> servers as well) how is the offset handled ? 
> 2. In a stateless job how is the offset managed ? since there is no
> persistency . I mean in aspect of exactly once, recovery ...
>
> BR
> Avi



signature.asc
Description: OpenPGP digital signature


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Tzu-Li (Gordon) Tai
@Ufuk

Yes, creating a JIRA now already to track this makes sense.

I've proceeded to open one:
https://issues.apache.org/jira/browse/FLINK-12047
Let's move any further discussions there.

Cheers,
Gordon

On Thu, Mar 28, 2019 at 5:01 PM Ufuk Celebi  wrote:

> I think such a tool would be really valuable to users.
>
> @Gordon: What do you think about creating an umbrella ticket for this
> and linking it in this thread? That way, it's easier to follow this
> effort. You could also link Bravo and Seth's tool in the ticket as
> starting points.
>
> – Ufuk
>


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Vishal Santoshi
+1

On Thu, Mar 28, 2019, 5:01 AM Ufuk Celebi  wrote:

> I think such a tool would be really valuable to users.
>
> @Gordon: What do you think about creating an umbrella ticket for this
> and linking it in this thread? That way, it's easier to follow this
> effort. You could also link Bravo and Seth's tool in the ticket as
> starting points.
>
> – Ufuk
>


Re: RocksDBStatebackend does not write checkpoints to backup path

2019-03-28 Thread Paul Lam
Hi Gordon,

Thanks for your reply. I’ve found out that it should be a bug of 
RocksDBStateBackend [1].

[1] https://issues.apache.org/jira/browse/FLINK-12042 


Best,
Paul Lam

> 在 2019年3月28日,17:03,Tzu-Li (Gordon) Tai  写道:
> 
> Hi,
> 
> Do you have the full error message of the failure?
> A wild guess to begin with: have you made sure that there are sufficient 
> permissions to create the directory?
> 
> Best,
> Gordon
> 
> On Tue, Mar 26, 2019 at 5:46 PM Paul Lam  > wrote:
> Hi,
> 
> I have a job (with Flink 1.6.4) which uses rocksdb incremental checkpointing, 
> but the checkpointing always fails with `IllegalStateException`, 
> because hen performing `RocksDBIncrementalSnapshotOperation`, rocksdb finds 
> that `localBackupDirectory`, which should be created earlier
> by rocksdb checkpoint, doesn’t exist. But there is no error message about 
> failures of rocksdb checkpoint. 
> 
> What could possibly be the cause? Thanks a lot!
> 
> Best,
> Paul Lam
> 



Re: Flink Job 监控

2019-03-28 Thread cheng
我们目前是用standalone 模式部署的集群,请问这个job state 有关于job是否挂掉或者重启的指标?我看官方文档好像没找到。

> 在 2019年3月28日,下午4:51,浪人 <1543332...@qq.com> 写道:
> 
> 如果是使用flink集成cluster可以监控flink的job state,如果是yarn是超脱模式可以监控yarn的状态。
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人: "cheng";
> 发送时间: 2019年3月28日(星期四) 下午4:38
> 收件人: "user-zh";
> 
> 主题: Flink Job 监控
> 
> 
> 
> 各位好!
>   请教下各位,Flink Job 在生产上运行时,关于job运行状态的监控和告警一般是采用什么方案处理的? 
> 比如监控job是否在正常运行,如果发现job 挂掉了 或者重启了 就进行告警。我这边有将一些metric 推到prometheus 
> 但是好像没有发现关于job是否挂掉的metric。
>   希望有做过这种方案的朋友能赐教下,谢谢了!!



Re: RocksDBStatebackend does not write checkpoints to backup path

2019-03-28 Thread Tzu-Li (Gordon) Tai
Hi,

Do you have the full error message of the failure?
A wild guess to begin with: have you made sure that there are sufficient
permissions to create the directory?

Best,
Gordon

On Tue, Mar 26, 2019 at 5:46 PM Paul Lam  wrote:

> Hi,
>
> I have a job (with Flink 1.6.4) which uses rocksdb incremental
> checkpointing, but the checkpointing always fails with
> `IllegalStateException`,
> because hen performing `RocksDBIncrementalSnapshotOperation`, rocksdb
> finds that `localBackupDirectory`, which should be created earlier
> by rocksdb checkpoint, doesn’t exist. But there is no error message about
> failures of rocksdb checkpoint.
>
> What could possibly be the cause? Thanks a lot!
>
> Best,
> Paul Lam
>
>


关于Blink 消费kafka并行度问题

2019-03-28 Thread 邓成刚【qq】
请教一下,Blink 消费kafka数据时,把并行度设置 30 ,就会出现Timeout,JOB跑不起来,应该是没有消费到数据,把并行度调 到 
5就没问题,另外,JOB用到4个TOPic,每个30个PARTITION,但是把这同样JOB提交给FLINK 设置 30 并行度 
就可以跑起来,有哪位大佬知道什么情况吗?



Re: What are savepoint state manipulation support plans

2019-03-28 Thread Ufuk Celebi
I think such a tool would be really valuable to users.

@Gordon: What do you think about creating an umbrella ticket for this
and linking it in this thread? That way, it's easier to follow this
effort. You could also link Bravo and Seth's tool in the ticket as
starting points.

– Ufuk


Re: RocksDB local snapshot sliently disappears and cause checkpoint to fail

2019-03-28 Thread Yu Li
Ok, much clearer now. Thanks.

Best Regards,
Yu


On Thu, 28 Mar 2019 at 15:59, Paul Lam  wrote:

> Hi Yu,
>
> I’ve set `fs.default-scheme` to hdfs, and it's mainly used for simplifying
> checkpoint / savepoint / HA paths.
>
> And I leave the rocksdb local dir empty, so the local snapshot still goes
> to YARN local cache dirs.
>
> Hope that answers your question.
>
> Best,
> Paul Lam
>
> 在 2019年3月28日,15:34,Yu Li  写道:
>
> Hi Paul,
>
> Regarding "mistakenly uses the default filesystem scheme, which is
> specified to hdfs in the new cluster in my case", could you further clarify
> the configuration property and value you're using? Do you mean you're using
> an HDFS directory to store the local snapshot data? Thanks.
> Best Regards,
> Yu
>
>
>
> On Thu, 28 Mar 2019 at 14:34, Paul Lam  wrote:
>
>> Hi,
>>
>> It turns out that under certain circumstances rocksdb statebackend
>> mistakenly uses the default filesystem scheme, which is specified to hdfs
>> in the new cluster in my case.
>>
>> I’ve filed a Jira to track this[1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12042
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年3月27日,19:06,Paul Lam  写道:
>>
>> Hi,
>>
>> I’m using Flink 1.6.4 and recently I ran into a weird issue of rocksdb
>> statebackend. A job that runs fine on a YARN cluster keeps failing on
>> checkpoint after migrated to a new one
>> (with almost everything the same but better machines), and even a clean
>> restart doesn’t help.
>>
>> The root cause is IllegalStateException but with no error message. The
>> stack trace shows that when the rocksdb statebackend is doing the async
>> part of snapshots (runSnapshot),
>> it finds that the local snapshot directory that is created by rocksdb
>> earlier (takeSnapshot) does not exist.
>>
>> I tried to log more informations in RocksDBKeyedStateBackend (see
>> attachment), and found that the local snapshot performed as expected and
>> the .sst files were written,
>> but when the async task accessed the directory, the whole snapshot
>> directory was gone.
>>
>> What could possibly be the cause? Thanks a lot.
>>
>> Best,
>> Paul Lam
>>
>> 
>>
>>
>>


??????Flink Job ????

2019-03-28 Thread ????
??flinkclusterflink??job 
state,??yarn??yarn




--  --
??: "cheng";
: 2019??3??28??(??) 4:38
??: "user-zh";

: Flink Job 




Flink Job 
job?? 
jobjob ?? ?? 
??metric prometheus 
job??metric??


Flink Job 监控

2019-03-28 Thread cheng
各位好!
请教下各位,Flink Job 在生产上运行时,关于job运行状态的监控和告警一般是采用什么方案处理的? 
比如监控job是否在正常运行,如果发现job 挂掉了 或者重启了 就进行告警。我这边有将一些metric 推到prometheus 
但是好像没有发现关于job是否挂掉的metric。
希望有做过这种方案的朋友能赐教下,谢谢了!!

Flink Job 监控

2019-03-28 Thread cheng
各位好!
请教下各位,Flink Job 在生产上运行时,关于job运行状态的监控和告警一般是采用什么方案处理的? 
比如监控job是否在正常运行,如果发现job 挂掉了 或者重启了 就进行告警。我这边有将一些metric 推到prometheus 
但是好像没有发现关于job是否挂掉的metric。
希望有做过这种方案的朋友能赐教下,谢谢了!!

Re: What are savepoint state manipulation support plans

2019-03-28 Thread Tzu-Li (Gordon) Tai
Hi!

Regarding the support for savepoint reading / writing / processing directly
in core Flink, we've been thinking about that lately and might push a bit
for adding the functionality to Flink in the next release.
For example, beside Bravo, Seth (CC'ed) also had implemented something [1]
for this. We should start thinking about converging the efforts of similar
tools and supporting it in Flink soon.
There's no official JIRA / feature proposal for this yet, but if you're
interested, please keep an eye on the dev mailing list for it in the future.

Cheers,
Gordon

[1] https://github.com/sjwiesman/flink/tree/savepoint-connector

On Thu, Mar 28, 2019 at 4:26 PM Gyula Fóra  wrote:

> Hi!
>
> I dont think there is any ongoing effort in core Flink other than this
> library we created.
>
> You are probably right that it is pretty hacky at the moment. I would say
> this one way we could do it that seemed convenient to me at the time I have
> written the code.
>
> If you have ideas how to structure it better or improve it, you know
> where to find the code, feel free to open a PR :) That might actually takes
> us closer to having this properly in flink one day soon.
>
> Just to clarify the code you are showing:
> writer.writeAll() -> Runs the batch job that writes the checkpoint files
> for the changed operator states, returns the reference to the OperatorState
> metadata object
> StateMetadataUtils.createNewSavepoint() -> Replaces the metadata for the
> operator states you have just written in the previous savepoint
> StateMetadataUtils.writeSavepointMetadata() -> Writes a new metadata file
>
> So metadata writing happens as the very last step after the batch job has
> run. This is similar to how it works in streaming jobs in the sense there
> the jobmanager writes the metafile after the checkpointing is done. The
> downside of this approach is that the client might not have access to write
> the metafile here.
>
> Gyula
>
>
>


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Gyula Fóra
Hi!

I dont think there is any ongoing effort in core Flink other than this
library we created.

You are probably right that it is pretty hacky at the moment. I would say
this one way we could do it that seemed convenient to me at the time I have
written the code.

If you have ideas how to structure it better or improve it, you know
where to find the code, feel free to open a PR :) That might actually takes
us closer to having this properly in flink one day soon.

Just to clarify the code you are showing:
writer.writeAll() -> Runs the batch job that writes the checkpoint files
for the changed operator states, returns the reference to the OperatorState
metadata object
StateMetadataUtils.createNewSavepoint() -> Replaces the metadata for the
operator states you have just written in the previous savepoint
StateMetadataUtils.writeSavepointMetadata() -> Writes a new metadata file

So metadata writing happens as the very last step after the batch job has
run. This is similar to how it works in streaming jobs in the sense there
the jobmanager writes the metafile after the checkpointing is done. The
downside of this approach is that the client might not have access to write
the metafile here.

Gyula


Re: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-28 Thread 邓成刚【qq】
通过测试发现,不是sql 脚本的问题,是并行度的问题,30个并行度不行,改成5就OK了。。。
env.setParallelism(5);
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:17
收件人: user-zh
主题: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
 
消费正常的code:
 
String sql = "select * from table1"
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
 
如果把SQL换成如下就会timeout...
 
String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
       +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from table1"         
      +") group by TUMBLE(EVENTTIME,INTERVAL '1' MINUTE),NEW_EVENT_ID,MSISDN"); 
 
 
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 
 
 
 

Re: RocksDB local snapshot sliently disappears and cause checkpoint to fail

2019-03-28 Thread Paul Lam
Hi Yu,

I’ve set `fs.default-scheme` to hdfs, and it's mainly used for simplifying 
checkpoint / savepoint / HA paths.

And I leave the rocksdb local dir empty, so the local snapshot still goes to 
YARN local cache dirs.

Hope that answers your question.

Best,
Paul Lam

> 在 2019年3月28日,15:34,Yu Li  写道:
> 
> Hi Paul,
> 
> Regarding "mistakenly uses the default filesystem scheme, which is specified 
> to hdfs in the new cluster in my case", could you further clarify the 
> configuration property and value you're using? Do you mean you're using an 
> HDFS directory to store the local snapshot data? Thanks.
> 
> Best Regards,
> Yu
> 
> 
> On Thu, 28 Mar 2019 at 14:34, Paul Lam  > wrote:
> Hi,
> 
> It turns out that under certain circumstances rocksdb statebackend mistakenly 
> uses the default filesystem scheme, which is specified to hdfs in the new 
> cluster in my case.
> 
> I’ve filed a Jira to track this[1]. 
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12042 
> 
> 
> Best,
> Paul Lam
> 
>> 在 2019年3月27日,19:06,Paul Lam > > 写道:
>> 
>> Hi,
>> 
>> I’m using Flink 1.6.4 and recently I ran into a weird issue of rocksdb 
>> statebackend. A job that runs fine on a YARN cluster keeps failing on 
>> checkpoint after migrated to a new one 
>> (with almost everything the same but better machines), and even a clean 
>> restart doesn’t help. 
>> 
>> The root cause is IllegalStateException but with no error message. The stack 
>> trace shows that when the rocksdb statebackend is doing the async part of 
>> snapshots (runSnapshot), 
>> it finds that the local snapshot directory that is created by rocksdb 
>> earlier (takeSnapshot) does not exist. 
>> 
>> I tried to log more informations in RocksDBKeyedStateBackend (see 
>> attachment), and found that the local snapshot performed as expected and the 
>> .sst files were written, 
>> but when the async task accessed the directory, the whole snapshot directory 
>> was gone. 
>> 
>> What could possibly be the cause? Thanks a lot.
>> 
>> Best,
>> Paul Lam
>> 
>> http://rocksdb_illegal_state.log.md/>>
>> 
> 



Re: RocksDB local snapshot sliently disappears and cause checkpoint to fail

2019-03-28 Thread Yu Li
Hi Paul,

Regarding "mistakenly uses the default filesystem scheme, which is
specified to hdfs in the new cluster in my case", could you further clarify
the configuration property and value you're using? Do you mean you're using
an HDFS directory to store the local snapshot data? Thanks.

Best Regards,
Yu


On Thu, 28 Mar 2019 at 14:34, Paul Lam  wrote:

> Hi,
>
> It turns out that under certain circumstances rocksdb statebackend
> mistakenly uses the default filesystem scheme, which is specified to hdfs
> in the new cluster in my case.
>
> I’ve filed a Jira to track this[1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-12042
>
> Best,
> Paul Lam
>
> 在 2019年3月27日,19:06,Paul Lam  写道:
>
> Hi,
>
> I’m using Flink 1.6.4 and recently I ran into a weird issue of rocksdb
> statebackend. A job that runs fine on a YARN cluster keeps failing on
> checkpoint after migrated to a new one
> (with almost everything the same but better machines), and even a clean
> restart doesn’t help.
>
> The root cause is IllegalStateException but with no error message. The
> stack trace shows that when the rocksdb statebackend is doing the async
> part of snapshots (runSnapshot),
> it finds that the local snapshot directory that is created by rocksdb
> earlier (takeSnapshot) does not exist.
>
> I tried to log more informations in RocksDBKeyedStateBackend (see
> attachment), and found that the local snapshot performed as expected and
> the .sst files were written,
> but when the async task accessed the directory, the whole snapshot
> directory was gone.
>
> What could possibly be the cause? Thanks a lot.
>
> Best,
> Paul Lam
>
> 
>
>
>


Re: RocksDB local snapshot sliently disappears and cause checkpoint to fail

2019-03-28 Thread Paul Lam
Hi,

It turns out that under certain circumstances rocksdb statebackend mistakenly 
uses the default filesystem scheme, which is specified to hdfs in the new 
cluster in my case.

I’ve filed a Jira to track this[1]. 

[1] https://issues.apache.org/jira/browse/FLINK-12042 


Best,
Paul Lam

> 在 2019年3月27日,19:06,Paul Lam  写道:
> 
> Hi,
> 
> I’m using Flink 1.6.4 and recently I ran into a weird issue of rocksdb 
> statebackend. A job that runs fine on a YARN cluster keeps failing on 
> checkpoint after migrated to a new one 
> (with almost everything the same but better machines), and even a clean 
> restart doesn’t help. 
> 
> The root cause is IllegalStateException but with no error message. The stack 
> trace shows that when the rocksdb statebackend is doing the async part of 
> snapshots (runSnapshot), 
> it finds that the local snapshot directory that is created by rocksdb earlier 
> (takeSnapshot) does not exist. 
> 
> I tried to log more informations in RocksDBKeyedStateBackend (see 
> attachment), and found that the local snapshot performed as expected and the 
> .sst files were written, 
> but when the async task accessed the directory, the whole snapshot directory 
> was gone. 
> 
> What could possibly be the cause? Thanks a lot.
> 
> Best,
> Paul Lam
> 
> 
>