Re: Trouble sinking to Kafka

2022-02-23 Thread Nicolaus Weidner
Hi Marco,

The documentation kind of suggestion this is the cause:
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html
>
> However, I think the documentation could benefit with a few examples and
> scenarios that can ill-considered configurations.
>

Matthias already provided a nice explanation of the usage of Kafka
transactions in EXACTLY_ONCE mode (thanks Matthias!), so I just want to
comment on the docs: I think you are right that this gotcha could be
pointed out explicitly. In fact, it is pointed out in the 1.14 docs [1]
(last sentence of the EXACTLY_ONCE
description), so others obviously thought the same.
If you feel it should still be added to the 1.12 docs, feel free to open a
Jira ticket or even provide a PR yourself :-) [2].

Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
[2] https://flink.apache.org/contributing/how-to-contribute.html


>


RE: Trouble sinking to Kafka

2022-02-23 Thread Schwalbe Matthias
Good morning Marco,


Your fix is pretty plausible:

  *   Kafka transactions get started at the beginning of a checkpoint period 
and contain all events collected through this period,
  *   At the end of the checkpoint period the associated transaction is 
committed and concurrently the transaction of the next checkpoint period is 
started
  *   In your case (checkpoint period + minimum distance) always last at least 
1 minute and hence the transaction timeouts
Kafka transactions work a little different to traditional RDBMS transactions:

  *   They are basically a pointer offset in each kafka partition that marks a 
range of pending events to be committed
  *   A kafka reader can either read-uncommitted, and sees these uncommitted 
events immediately or
  *   If in read-committed mode: needs to wait for the committed record offset 
(per partition) to advance
  *   If transactions don’t commit, such reader effectively gets halted
  *   Kafka transaction timeout is a means to prevent consumers to get blocked 
for too long if one of the producers fails to commit (e.g. crashed)

Your fix to increase kafka transaction timeout is sound in this respect.

Documentation on the kafka page is very detailed …

Open questions? … get back to the community 

Cheers

Thias


From: Marco Villalobos 
Sent: Mittwoch, 23. Februar 2022 19:11
To: Nicolaus Weidner 
Cc: user 
Subject: Re: Trouble sinking to Kafka

I fixed this, but I'm not 100% sure why.

Here is my theory:

My checkpoint interval is one minute, and the minimum pause interval is also 
one minute. My transaction timeout time is also one minute. I think the 
checkpoint causes Flink to hold the transaction open for one minute, and thus 
it times out.

After I changed the 
transaction.max.timeout.ms to one hour, and 
the transaction.timeout.ms to five minutes, it 
all worked like a charm.

Is my theory correct?

The documentation kind of suggestion this is the cause:  
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html

However, I think the documentation could benefit with a few examples and 
scenarios that can ill-considered configurations.

Thank you.

On Wed, Feb 23, 2022 at 9:29 AM Nicolaus Weidner 
mailto:nicolaus.weid...@ververica.com>> wrote:
Hi Marco,

I'm no expert on the Kafka producer, but I will try to help. [1] seems to have 
a decent explanation of possible error causes for the error you encountered.
Which leads me to two questions:


if (druidProducerTransactionMaxTimeoutMs > 0) {

  
properties.setProperty("transaction.max.timeout.ms",
 Integer.toString(druidProducerTransactionMaxTimeoutMs));
   }
   if (druidProducerTransactionTimeoutMs > 0) {
  
properties.setProperty("transaction.timeout.ms", 
Integer.toString(druidProducerTransactionTimeoutMs));
   }

Have you tried increasing the timeout settings, to see if transactions timed 
out?


   properties.setProperty("transactional.id", 
"local.druid");

Do you use multiple producers (parallelism > 1)? It seems you always set the 
same transactional.id, which I expect causes problems 
when you have multiple producer instances (see "zombie fencing" in [2]). In 
that case, just make sure they are unique.

And one additional question: Does the error occur consistently, or only 
occasionally?

Best,
Nico

[1] 
https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send
[2] https://stackoverflow.com/a/52304789
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


?????? Flink????????HDFS

2022-02-23 Thread Tony




----
??: "wenjie li"

Re: Flink数据写入HDFS

2022-02-23 Thread wenjie li
1. 使用 BucketingSink 写入HDFS 可以配置滚动策略来决定写文件的大小
2. 如果由于写入频率大和输出数据量比较小的情况第一种方案不是很好,可以考虑在后面另外启动一个合并小文件的定时任务。

Tony <1298877...@qq.com.invalid> 于2022年2月24日周四 12:10写道:

> Flink数据写入HDFS,如何解决小文件问题?
> FlinkSQL有小文件合并策略,Flink dataStream 写入HDFS,如何解决?


Re: Restart from checkpoint - kinesis consumer

2022-02-23 Thread Vijayendra Yadav
Thanks Danny, Let me comeback with results.

> 
> On Feb 23, 2022, at 3:41 AM, Danny Cranmer  wrote:
> 
> 
> Hello Vijay,
> 
> > Once i do that my flink consumer need to be restarted with changed 
> > parallelism.
> Why is this? The Flink consumer continuously scans for new shards, and will 
> auto scale up/down the number of shard consumer threads to accommodate 
> Kinesis resharding. Flink job/operator parallelism does not need to match the 
> number of Kinesis shards.
> 
> > How do i solve this problem of restarting from existing checkpoint which 
> > was created with respect to N shards but now while restating we have more 
> > or less shards?
> The consumer should handle this. When the source starts it discovers active 
> shards on the stream and attaches available state on a per shard basis. What 
> is the problem here, is there an Exception you can share?
> 
> Thanks,
> 
> 
>> On Wed, Feb 23, 2022 at 11:25 AM Vijayendra Yadav  
>> wrote:
>> Hi Team,
>> 
>> I am running flink 1.11 kinesis consumer with say N kinesis shards, but i 
>> want to increase/decrease shards to N+M or N-M.
>> Once i do that my flink consumer need to be restarted with changed 
>> parallelism. 
>> But i am unable to restart from existing checkpoint because of change in 
>> number of shards.
>> 
>> How do i solve this problem of restarting from existing checkpoint which was 
>> created with respect to N shards but now while restating we have more or 
>> less shards?
>> 
>> Thanks,
>>  Vijay


[Flink-1.14.3] Restart of pod due to duplicatejob submission

2022-02-23 Thread Parag Somani
Hello,

Recently due to log4j vulnerabilities, we have upgraded to Apache Flink
1.14.3. What we observed we are getting following exception, and because of
it pod gets in crashloopback. We have seen this issues esp. during the time
of upgrade or deployment time when existing pod is already running.

What would it be causing this issue during deployment time? Any assistance
as a workaround would be much appreciated.

Also, i am seeing this issue only after upgrade from 1.14.2 to 1.14.3 .

Env:
Deployed on : k8s
Flink version: 1.14.3
HA using zookeeper

Logs:
2022-02-23 05:13:14.555 ERROR 45 --- [t-dispatcher-17]
c.b.a.his.service.FlinkExecutorService   : Failed to execute job

org.apache.flink.util.FlinkException: Failed to execute job 'events rates
calculation'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
~[flink-clients_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
~[flink-clients_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
com.bmc.ade.his.service.FlinkExecutorService.init(FlinkExecutorService.java:37)
~[health-service-1.0.00.jar:1.0.00]
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:na]
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:na]
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
~[na:na]
at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
~[spring-beans-5.3.4.jar:5.3.4]
at
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:917)
~[spring-context-5.3.4.jar:5.3.4]
at
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:582)
~[spring-context-5.3.4.jar:5.3.4]
at
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
~[spring-boot-2.5.5.jar:2.5.5]
at
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
~[spring-boot-2.5.5.jar:2.5.5]
at
org.springframework.boot.SpringApplication.run(SpringApplication.java:338)
~[spring-boot-2.5.5.jar:2.5.5]
at
org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
~[spring-boot-2.5.5.jar:2.5.5]
at

Flink????????HDFS

2022-02-23 Thread Tony
FlinkHDFS??
FlinkSQL??Flink dataStream HDFS

回复:hive 进行 overwrite 合并数据后文件变大?

2022-02-23 Thread Shuai Xia
emmm,你看下这个能不能帮到你
https://jxeditor.github.io/2020/06/10/Hive%E5%8E%8B%E7%BC%A9%E6%95%88%E6%9E%9C%E4%B8%8D%E6%98%8E%E6%98%BE%E8%B8%A9%E5%9D%91%E8%AE%B0%E5%BD%95/


--
发件人:RS 
发送时间:2022年2月22日(星期二) 09:36
收件人:user-zh 
主 题:hive 进行 overwrite 合并数据后文件变大?

Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M  822.1 M  /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M   144.2 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G8.7 G/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M   142.9 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22




Re: 状态初始化

2022-02-23 Thread Jie Han
可以参考 CheckpointedFunction::initializeState

> 2022年2月23日 下午8:21,huangzhi...@iwgame.com 写道:
> 
> 
> flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?
> 
> 
> huangzhi...@iwgame.com



Re: Flink SQL: HOW to define `ANY` subtype in constructured Constructured Data Types(MAP, ARRAY...)

2022-02-23 Thread Jie Han
There is no built-in LogicType for ’ANY’, it’s a invalid token

> 2022年2月23日 下午10:29,zhouhaifengmath  写道:
> 
> 
> When I define a udf paramters like:
> public @DataTypeHint("Row") Row 
> eval(@DataTypeHint("MAP") Map mapData)
> 
> It gives error:
> Please check for implementation mistakes and/or provide a corresponding 
> hint.
>at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>at 
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>... 20 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in 
> extracting a signature to output mapping.
>at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>... 23 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to extract 
> a type inference from method:
> public org.apache.flink.types.Row 
> com.netease.nie.sql.udfs.P1P2.eval(java.util.Map)
>at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>... 25 more
> Caused by: org.apache.flink.table.api.TableException: User-defined types are 
> not supported yet.
>at 
> org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)
>at 
> org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)
>at 
> org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)
>at 
> org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)
>at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
>at 
> org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)
>at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator.visit(LogicalTypeDuplicator.java:63)
>at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator.visit(LogicalTypeDuplicator.java:44)
>at 
> org.apache.flink.table.types.logical.MapType.accept(MapType.java:115)
>at 
> org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)
>at 
> org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)
>at 
> org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)
>at 
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:145)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
>at java.util.Optional.orElseGet(Optional.java:267)
>at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
>at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>at 
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>at 
> 

状态初始化

2022-02-23 Thread huangzhi...@iwgame.com

flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?


huangzhi...@iwgame.com


状态初始化

2022-02-23 Thread huangzhi...@iwgame.com
大家好,我想请问下flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?



huangzhi...@iwgame.com


Re: 状态初始化

2022-02-23 Thread Jiangang Liu
作业在启动时可以使用 Processor API加载状态,可以参考
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

huangzhi...@iwgame.com  于2022年2月23日周三 20:28写道:

>
> flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?
>
>
> huangzhi...@iwgame.com
>


Re: Trouble sinking to Kafka

2022-02-23 Thread Marco Villalobos
I fixed this, but I'm not 100% sure why.

Here is my theory:

My checkpoint interval is one minute, and the minimum pause interval is
also one minute. My transaction timeout time is also one minute. I think
the checkpoint causes Flink to hold the transaction open for one minute,
and thus it times out.

After I changed the *transaction.max.timeout.ms
* to one hour, and the
*transaction.timeout.ms
* to five minutes, it all worked like a
charm.

Is my theory correct?

The documentation kind of suggestion this is the cause:
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html

However, I think the documentation could benefit with a few examples and
scenarios that can ill-considered configurations.

Thank you.

On Wed, Feb 23, 2022 at 9:29 AM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Marco,
>
> I'm no expert on the Kafka producer, but I will try to help. [1] seems to
> have a decent explanation of possible error causes for the error you
> encountered.
> Which leads me to two questions:
>
>
> if (druidProducerTransactionMaxTimeoutMs > 0) {
>>
>>   properties.setProperty("transaction.max.timeout.ms", 
>> Integer.toString(druidProducerTransactionMaxTimeoutMs));
>>}
>>if (druidProducerTransactionTimeoutMs > 0) {
>>   properties.setProperty("transaction.timeout.ms", 
>> Integer.toString(druidProducerTransactionTimeoutMs));
>>}
>>
>>
> Have you tried increasing the timeout settings, to see if transactions
> timed out?
>
>
>>properties.setProperty("transactional.id", "local.druid");
>>
>>
> Do you use multiple producers (parallelism > 1)? It seems you always set
> the same transactional.id, which I expect causes problems when you have
> multiple producer instances (see "zombie fencing" in [2]). In that case,
> just make sure they are unique.
>
> And one additional question: Does the error occur consistently, or only
> occasionally?
>
> Best,
> Nico
>
> [1]
> https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send
> [2] https://stackoverflow.com/a/52304789
>
>>


Re: Flink job recovery after task manager failure

2022-02-23 Thread Zhilong Hong
Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a
heartbeat timeout happens. Currently, the default value of
heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
seconds for Flink to trigger a failover. If you'd like to shorten the time
a failover is triggered in this situation, you could decrease the value of
heartbeat.timeout in flink-conf.yaml. However, if the value is set too
small, heartbeat timeouts will happen more frequently and the cluster will
be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit
weird. How long does it take to deploy your job for a brand new launch? You
could compact and upload the log of JobManager to Google Drive or OneDrive
and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.a...@nokia.com> wrote:

> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=3
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>


Re: Trouble sinking to Kafka

2022-02-23 Thread Nicolaus Weidner
Hi Marco,

I'm no expert on the Kafka producer, but I will try to help. [1] seems to
have a decent explanation of possible error causes for the error you
encountered.
Which leads me to two questions:


if (druidProducerTransactionMaxTimeoutMs > 0) {
>
>   properties.setProperty("transaction.max.timeout.ms", 
> Integer.toString(druidProducerTransactionMaxTimeoutMs));
>}
>if (druidProducerTransactionTimeoutMs > 0) {
>   properties.setProperty("transaction.timeout.ms", 
> Integer.toString(druidProducerTransactionTimeoutMs));
>}
>
>
Have you tried increasing the timeout settings, to see if transactions
timed out?


>properties.setProperty("transactional.id", "local.druid");
>
>
Do you use multiple producers (parallelism > 1)? It seems you always set
the same transactional.id, which I expect causes problems when you have
multiple producer instances (see "zombie fencing" in [2]). In that case,
just make sure they are unique.

And one additional question: Does the error occur consistently, or only
occasionally?

Best,
Nico

[1]
https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send
[2] https://stackoverflow.com/a/52304789

>


Flink job recovery after task manager failure

2022-02-23 Thread Afek, Ifat (Nokia - IL/Kfar Sava)
Hi,

I am trying to use Flink checkpoints solution in order to support task manager 
recovery.
I’m running flink using beam with filesystem storage and the following 
parameters:
checkpointingInterval=3
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 
seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case 
the task manager is killed, until the jobs are recovered? Are there any best 
practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat



Flink SQL: HOW to define `ANY` subtype in constructured Constructured Data Types(MAP, ARRAY...)

2022-02-23 Thread zhouhaifengmath










When I define a udf paramters like:    public @DataTypeHint("Row") Row eval(@DataTypeHint("MAP")     Map mapData)It gives error:    Please check for implementation mistakes and/or provide a corresponding hint.        at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)        at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)        at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)        at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)        at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)        ... 20 moreCaused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.        at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)        at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)        at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)        ... 23 moreCaused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method:public org.apache.flink.types.Row com.netease.nie.sql.udfs.P1P2.eval(java.util.Map)        at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)        ... 25 moreCaused by: org.apache.flink.table.api.TableException: User-defined types are not supported yet.        at org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)        at org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)        at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)        at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)        at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)        at org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)        at org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator.visit(LogicalTypeDuplicator.java:63)        at org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator.visit(LogicalTypeDuplicator.java:44)        at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115)        at org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)        at org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)        at org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)        at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:145)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)        at java.util.Optional.orElseGet(Optional.java:267)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)        at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)        at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)        at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)        at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)        at 

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-23 Thread Sigalit Eliazov
Thanks for the response on this issue.
with the same configuration defined

*high-availability: zookeeper*

*high-availability.zookeeper.quorum: zk-noa-edge-infra:2181*

*high-availability.zookeeper.path.root: /flink*

*high-availability.storageDir: /flink_state*

*high-availability.jobmanager.port: 6124*

for the storageDir, we are using a k8s persistent volume with ReadWriteOnce


Currently we are facing a new issue. the task managers are able to register
to the job manager via this address */:6124*

 but when trying to run tasks we get an error from a BlobClient


java.io.IOException: Failed to fetch BLOB
e78e9574da4f5e4bdbc8de9678ebfb36/p-650534cd619de1069630141f1dcc9876d6ce2ce0-ee11ae52caa20ff81909708a783fd596
from /: and store it under
/hadoop/yarn/local/usercache/hdfs/appcache/application_1570784539965_0165/blobStore-79420f3a-6a83-40d4-8058-f01686a1ced8/incoming/temp-0072
at
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:169)
at
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
at
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
at
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not connect to BlobServer at address
*/:6124*
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
at
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
... 7 more
Caused by: java.net.SocketException: 
at java.net.Socket.createImpl(Socket.java:460)
at java.net.Socket.connect(Socket.java:587)
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)


We see these errors both on the task manager and on the job manager.

regarding the job manager: does he access himself also via some external
service in order to fetch the blob?

Thanks

Sigalit

On Thu, Feb 17, 2022 at 3:51 PM Chesnay Schepler  wrote:

> You could reduce the log level of the PermanentBlobCache to WARN via the
> Log4j configuration.
> I think you could even filter this specific message with Log4j.
>
> On 17/02/2022 14:35, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:
>
> Thanks,
>
> I understand that the functionality isn’t affected, this is very good news.
>
> But is there a way to either skip this check or skip logging it? We see it
> in our log more the 400 times per task manager.
>
> It would be very helpful if the log level could be reduced, or the check
> could be skipped? Is there any way to achieve this?
>
>
>
> Thanks
>
> Noa
>
>
>
> *From: *Chesnay Schepler  
> *Date: *Thursday, 17 February 2022 at 15:00
> *To: *Koffman, Noa (Nokia - IL/Kfar Sava) 
> , Yun Gao 
> , user 
> 
> *Subject: *Re: Task manager errors with Flink ZooKeeper High Availability
>
> Everything is fine.
>
>
>
> The TM tries to retrieve the jar (aka, the blob), and there is a fast path
> to access it directly from storage. This fails (because it has no access to
> it), and then falls back to retrieving it from the JM.
>
>
>
> On 17/02/2022 13:49, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:
>
> Hi,
>
> Thanks for your reply,
>
> Please see below the full stack trace, and the log message right after, it
> looks like it is trying to download via BlobClient after failing to
> download from store, as you have suggested.
>
> My question is, is there a way to avoid this attempt to copy from blob
> store? Is my configuration of task manager wrong?
>
> Currently we are using the same flink-conf.yaml file for both job manager
> and task managers, which include the high-availability configuration
> mentioned below, should these be remove from the task managers?
>
>
>
>
>
> *2022-02-17 07:19:45,408 INFO
> org.apache.flink.runtime.blob.PermanentBlobCache [] - Failed to
> copy from blob store. Downloading from BLOB server instead.*
>
> *java.io.FileNotFoundException:
> /flink_state/default/blob/job_0ddba6dd21053567981e11bda8da7c8e/blob_p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
> (No such file or directory)*
>
> *at java.io.FileInputStream.open0(Native Method) ~[?:?]*
>
> *at java.io.FileInputStream.open(Unknown Source) ~[?:?]*
>
> *at java.io.FileInputStream.(Unknown Source) ~[?:?]*
>
> *at
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]*
>
> *at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]*
>
> *  

状态初始化

2022-02-23 Thread huangzhi...@iwgame.com

flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?


huangzhi...@iwgame.com


Re: Restart from checkpoint - kinesis consumer

2022-02-23 Thread Danny Cranmer
Hello Vijay,

> Once i do that my flink consumer need to be restarted with changed
parallelism.
Why is this? The Flink consumer continuously scans for new shards, and will
auto scale up/down the number of shard consumer threads to
accommodate Kinesis resharding. Flink job/operator parallelism does not
need to match the number of Kinesis shards.

> How do i solve this problem of restarting from existing checkpoint which
was created with respect to N shards but now while restating we have more
or less shards?
The consumer should handle this. When the source starts it discovers active
shards on the stream and attaches available state on a per shard basis.
What is the problem here, is there an Exception you can share?

Thanks,


On Wed, Feb 23, 2022 at 11:25 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> I am running flink 1.11 kinesis consumer with say N kinesis shards, but i
> want to increase/decrease shards to N+M or N-M.
> Once i do that my flink consumer need to be restarted with changed
> parallelism.
> But i am unable to restart from existing checkpoint because of change in
> number of shards.
>
> How do i solve this problem of restarting from existing checkpoint which
> was created with respect to N shards but now while restating we have more
> or less shards?
>
> Thanks,
>  Vijay


Restart from checkpoint - kinesis consumer

2022-02-23 Thread Vijayendra Yadav
Hi Team,

I am running flink 1.11 kinesis consumer with say N kinesis shards, but i want 
to increase/decrease shards to N+M or N-M.
Once i do that my flink consumer need to be restarted with changed parallelism. 
But i am unable to restart from existing checkpoint because of change in number 
of shards.

How do i solve this problem of restarting from existing checkpoint which was 
created with respect to N shards but now while restating we have more or less 
shards?

Thanks,
 Vijay

Re: java.lang.Exception: Job leader for job id 0efd8681eda64b072b72baef58722bc0 lost leadership.

2022-02-23 Thread Jai Patel
Hi Nico,

Thanks for getting back to us.  We are using Flink 1.14.0 and we are using
RocksDB.  We currently are using the default memory settings.  We'll look
into increasing our managed memory fraction to 0.6 and see what happens.

Do writes to ValueStates/MapStates have a direct on churn of the Flink
State or is the data buffered in between?

Thanks.
Jai

On Wed, Feb 23, 2022 at 12:55 AM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Jai,
>
> On Tue, Feb 22, 2022 at 9:19 PM Jai Patel 
> wrote:
>
>> It seems like the errors are similar to those discussed here:
>> - https://issues.apache.org/jira/browse/FLINK-14316
>> - https://cdmana.com/2020/11/20201116104527255b.html
>>
>
> I couldn't find any other existing issue apart from the one you already
> linked. Just to be sure: Which Flink version are you using? Is it one where
> the reported issue is fixed?
>
> As for the issue itself, it looks like the connection between JobManager
> and TaskManager was lost, though I can't tell why. Do you have full logs
> from JobManager and TaskManager surrounding such an incident?
>
>
>> When looking at the memory structure it looks like all memory is below
>> 100% except for managed memory.  We have 9.1GB of managed memory for each
>> of our 6 task managers and I estimate that our total Flink State is 600GB.
>> Is it okay for run with that little memory for that much State?
>>
>
> Are you using RocksDB or HashMap state backend [1]? I assume it's RocksDB,
> since with HashMapStateBackend, state size is limited by memory size (and
> you are way above that). Did you check out the memory configuration
> recommendations in the docs [2, 3]?
> In principle (assuming RocksDB is used), I don't think the amount of
> memory should be an issue (at least it shouldn't cause crashes). The logs
> would help to understand what's happening.
>
> Best,
> Nico
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_tuning/#configure-memory-for-state-backends
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb-memory
>


退订

2022-02-23 Thread 谭 海棠
退订

获取 Outlook for iOS


Re: java.lang.Exception: Job leader for job id 0efd8681eda64b072b72baef58722bc0 lost leadership.

2022-02-23 Thread Nicolaus Weidner
Hi Jai,

On Tue, Feb 22, 2022 at 9:19 PM Jai Patel 
wrote:

> It seems like the errors are similar to those discussed here:
> - https://issues.apache.org/jira/browse/FLINK-14316
> - https://cdmana.com/2020/11/20201116104527255b.html
>

I couldn't find any other existing issue apart from the one you already
linked. Just to be sure: Which Flink version are you using? Is it one where
the reported issue is fixed?

As for the issue itself, it looks like the connection between JobManager
and TaskManager was lost, though I can't tell why. Do you have full logs
from JobManager and TaskManager surrounding such an incident?


> When looking at the memory structure it looks like all memory is below
> 100% except for managed memory.  We have 9.1GB of managed memory for each
> of our 6 task managers and I estimate that our total Flink State is 600GB.
> Is it okay for run with that little memory for that much State?
>

Are you using RocksDB or HashMap state backend [1]? I assume it's RocksDB,
since with HashMapStateBackend, state size is limited by memory size (and
you are way above that). Did you check out the memory configuration
recommendations in the docs [2, 3]?
In principle (assuming RocksDB is used), I don't think the amount of memory
should be an issue (at least it shouldn't cause crashes). The logs would
help to understand what's happening.

Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_tuning/#configure-memory-for-state-backends
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb-memory


Re: CSV join in batch mode

2022-02-23 Thread Guowei Ma
Hi, Killian
Sorry for responding late!
I think there is no simple way that could catch csv processing errors. That
means that you need to do it yourself.(Correct me if I am missing
something).
I think you could use RockDB State Backend[1], which would spill data to
disk.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/#rocksdb-state-backend-details

Best,
Guowei


On Mon, Feb 21, 2022 at 6:33 PM Killian GUIHEUX <
killian.guiheu...@thalesdigital.io> wrote:

> Hello all,
>
> I have to perform a join between two large csv sets that do not fit in
> ram. I process this two files in batch mode. I also need a side output to
> catch csv processing errors.
> So my question is what is the best way to this kind of join operation ? I
> think I should use a valueState state backend but would it work if my ram
> is my states goes larger than my RAM ?
>
> Regards.
>
> Killian
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you are not the named
> addressee, you should not disseminate, distribute, or copy this e-mail. If
> you are not the intended recipient, you are notified that disclosing,
> distributing, or copying this e-mail is strictly prohibited.
>