Re: Strange issue with exactly once checkpoints and the kafka sink

2022-11-16 Thread Salva Alcántara
As noted in the SO, it's a bit confusing to me how the `checkpointing.mode`
delivery guarantees with the ones for the different sinks, and in
particular with the kafka one.

Based on the error I had, I understand that if I use `EXACTLY_ONCE` for the
checkpoints and I indicate nothing in the kafka sink, the default guarantee
for it is overriden and/or transactions are used anyway (???).

Does the checkpointing.mode guarantee really override the default one for
kafka? If so, would something like this

```
//  setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // don't call
this, in order for the kafka sink to automatically adapt
setTransactionalIdPrefix("XYZ") // just in case transactions are required


make the kafka sink automatically adapt to the checkpointing.mode (that is,
use the same guarantee) or on the contrary I should explicitly set both
guarantees? E.g.,

```
execution.checkpointing.mode='EXACTLY_ONCE'`
```

plus

```
setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
setTransactionalIdPrefix("XYZ")
```

Or, for `AT_LEAST_ONCE`:

```
execution.checkpointing.mode='AT_LEAST_ONCE'`
```

plus

```
setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// setTransactionalIdPrefix("XYZ") // don't required in this case
```

Any clarifications on this would be highly appreciated. Maybe you can point
me to the relevant code (or docs) if the interaction between those
guarantees is already well-documented.

Thanks in advance,

Salva


On Mon, Nov 7, 2022 at 8:06 AM Salva Alcántara 
wrote:

> I had a Flink 1.15.1 job configured with
>
> ```
> execution.checkpointing.mode=`EXACTLY_ONCE`
> ```
>
> that was failing with the following error
> ```
> Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from
> INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException:
> Failed to commit KafkaCommittable{producerId=4521984, epoch=0,
> transactionalId=}
> ...
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
>  for configuration transactional.id: String must be non-empty
> ```
>
> that happened after the first checkpoint was triggered. The strange thing
> about it is that the `KafkaSinkBuilder` was used without calling
> `setDeliverGuarantee`, and hence the default delivery guarantee was
> expected to be used, which is `NONE` [1].
>
> Is that even possible to start with? Shouldn't kafka transactions be
> involved only when one follows [this recipe] [2]:
>
>  * One can also configure different {@link DeliveryGuarantee} by using
> {@link
>  * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using
> {@link
>  * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix
> {@link
>  * #setTransactionalIdPrefix(String)}.
>
> So, in my case, without calling `setDeliverGuarantee` (nor
> `setTransactionalIdPrefix`), I cannot understand why I was seeing these
> errors. To avoid the problem, I temporarily changed the checkpointing
> settings to
>
> ```
> execution.checkpointing.mode=`AT_LEAST_ONCE`
> ```
>
> but I'd like to understand what was happening.
>
>
>   [1]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
>   [2]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51
>
> FYI I've also posted this in SO here:
> -
> https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints
>


Re: flinksql join

2022-11-16 Thread Zhiwen Sun
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。

要解决这个问题, dob_dim_account 需要变成流表。


Zhiwen Sun



On Thu, Nov 17, 2022 at 1:56 PM Jason_H  wrote:

> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  Replied Message 
> | From | 任召金 |
> | Date | 11/15/2022 09:52 |
> | To | user-zh |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
> 
> 
> --Original--
> From: "Jason_H" Date: Tue, Nov 15, 2022 09:46 AM
> To: "flink中文邮件组"
> Subject: Re: flinksql join
>
> 
>
> hi,你好
> 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  Replied Message 
> | From | RS | Date | 11/15/2022 09:07 |
> | To | user-zh@flink.apache.org | Subject | Re:flinksql join |
> Hi,
> 我的理解是后插入的维表数据,关联不到是正常现象,
> 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,
>
>
> Thanks
>
>
>
>
>
>
> 在 2022-11-11 11:10:03,"Jason_H" 
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
>  100 1 - 未匹配
>  100 1 - 未匹配
>  100 1 - 匹配上
>
> 维表
> 账号 企业
>  
>   - 后插入的账号信息
> 实际输出结果
> 企业 金额 笔数
>  100 1
>
>
> 我想要的结果:
> 企业 金额 笔数
>  300 3
>
>
>
>
>
> sql如下:
> String sql2 = "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> " ta.gmtStatistical as gmtStatistical,\n" +
> " ta.paymentMethod as paymentMethod,\n" +
> " tb.CORP_ID as outCorpId,\n" +
> " tc.CORP_ID as inCorpId,\n" +
> " sum(ta.tradeAmt) as tranAmount,\n" +
> " sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on
> ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on
> ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> " gmtStatistical, \n" +
> " paymentMethod, \n" +
> " outCorpId, \n" +
> " inCorpId, \n" +
> " tranAmount, \n" +
> " tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: Dependency resolution issue with apache-flink 1.16.0 python package.

2022-11-16 Thread Xingbo Huang
Hi Yogi,

I think the problem comes from poetry depending on the metadata in PyPI.
This problem has been reported in
https://issues.apache.org/jira/browse/FLINK-29817 and I will fix it in
1.16.1.

Best,
Xingbo

Yogi Devendra  于2022年11月17日周四 06:21写道:

> Dear community/maintainers,
>
> Thanks for the lovely platform of Apache Flink.
> I got following error when add apache-flink 1.16.0 dependency in my python
> project.
>
> Given below is the stack trace for further investigation.
> When I tried using lower version (1.15.2) for the same; I was able to move
> forward.
>
> Can you please confirm if this is a bug or I am doing something wrong?
>
> -
>
> learn-flink git:(master) ✗ poetry add apache-flink --verbose
> Using virtualenv: /Users/d.vyavahare/repo/
> github.com/yogidevendra/python-warm-up/learn-flink/.venv
> Using version ^1.16.0 for apache-flink
>
> Updating dependencies
> Resolving dependencies... (2.4s)
>
>   SolveFailure
>
>   Because pemja (0.2.6) depends on numpy (1.21.4)
>and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
> (0.2.6) is incompatible with apache-flink (1.16.0).
>   And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
> is forbidden.
>   So, because no versions of apache-flink match >1.16.0,<2.0.0
>and learn-flink depends on apache-flink (^1.16.0), version solving
> failed.
>
>   at ~/Library/Application
> Support/pypoetry/venv/lib/python3.8/site-packages/poetry/mixology/version_solver.py:364
> in _resolve_conflict
>   360│ )
>   361│ self._log(f'! which is caused by
> "{most_recent_satisfier.cause}"')
>   362│ self._log(f"! thus: {incompatibility}")
>   363│
> → 364│ raise SolveFailure(incompatibility)
>   365│
>   366│ def _choose_package_version(self) -> str | None:
>   367│ """
>   368│ Tries to select a version of a required package.
>
> The following error occurred when trying to handle this error:
>
>
>   SolverProblemError
>
>   Because pemja (0.2.6) depends on numpy (1.21.4)
>and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
> (0.2.6) is incompatible with apache-flink (1.16.0).
>   And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
> is forbidden.
>   So, because no versions of apache-flink match >1.16.0,<2.0.0
>and learn-flink depends on apache-flink (^1.16.0), version solving
> failed.
>
>   at ~/Library/Application
> Support/pypoetry/venv/lib/python3.8/site-packages/poetry/puzzle/solver.py:159
> in _solve
>   155│ packages = result.packages
>   156│ except OverrideNeeded as e:
>   157│ return
> self.solve_in_compatibility_mode(e.overrides, use_latest=use_latest)
>   158│ except SolveFailure as e:
> → 159│ raise SolverProblemError(e)
>   160│
>   161│ combined_nodes =
> depth_first_search(PackageNode(self._package, packages))
>   162│ results = dict(aggregate_package_nodes(nodes) for nodes
> in combined_nodes)
>   163│
>
> 
>
> ~ Yogi
>


Re: flinksql join

2022-11-16 Thread Jason_H
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题


| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | 任召金 |
| Date | 11/15/2022 09:52 |
| To | user-zh |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL


--Original--
From: "Jason_H"

Dependency resolution issue with apache-flink 1.16.0 python package.

2022-11-16 Thread Yogi Devendra
Dear community/maintainers,

Thanks for the lovely platform of Apache Flink.
I got following error when add apache-flink 1.16.0 dependency in my python
project.

Given below is the stack trace for further investigation.
When I tried using lower version (1.15.2) for the same; I was able to move
forward.

Can you please confirm if this is a bug or I am doing something wrong?

-

learn-flink git:(master) ✗ poetry add apache-flink --verbose
Using virtualenv: /Users/d.vyavahare/repo/
github.com/yogidevendra/python-warm-up/learn-flink/.venv
Using version ^1.16.0 for apache-flink

Updating dependencies
Resolving dependencies... (2.4s)

  SolveFailure

  Because pemja (0.2.6) depends on numpy (1.21.4)
   and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
(0.2.6) is incompatible with apache-flink (1.16.0).
  And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
is forbidden.
  So, because no versions of apache-flink match >1.16.0,<2.0.0
   and learn-flink depends on apache-flink (^1.16.0), version solving
failed.

  at ~/Library/Application
Support/pypoetry/venv/lib/python3.8/site-packages/poetry/mixology/version_solver.py:364
in _resolve_conflict
  360│ )
  361│ self._log(f'! which is caused by
"{most_recent_satisfier.cause}"')
  362│ self._log(f"! thus: {incompatibility}")
  363│
→ 364│ raise SolveFailure(incompatibility)
  365│
  366│ def _choose_package_version(self) -> str | None:
  367│ """
  368│ Tries to select a version of a required package.

The following error occurred when trying to handle this error:


  SolverProblemError

  Because pemja (0.2.6) depends on numpy (1.21.4)
   and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
(0.2.6) is incompatible with apache-flink (1.16.0).
  And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
is forbidden.
  So, because no versions of apache-flink match >1.16.0,<2.0.0
   and learn-flink depends on apache-flink (^1.16.0), version solving
failed.

  at ~/Library/Application
Support/pypoetry/venv/lib/python3.8/site-packages/poetry/puzzle/solver.py:159
in _solve
  155│ packages = result.packages
  156│ except OverrideNeeded as e:
  157│ return self.solve_in_compatibility_mode(e.overrides,
use_latest=use_latest)
  158│ except SolveFailure as e:
→ 159│ raise SolverProblemError(e)
  160│
  161│ combined_nodes =
depth_first_search(PackageNode(self._package, packages))
  162│ results = dict(aggregate_package_nodes(nodes) for nodes
in combined_nodes)
  163│



~ Yogi


Re: Kafka transactioins & flink checkpoints

2022-11-16 Thread Yaroslav Tkachenko
I gave a talk about that setup:
https://www.youtube.com/watch?v=tiGxEGPyqCg_channel=FlinkForward

The documentation suggests using unaligned checkpoints in case of
backpressure (
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints).
I'd like to add that typically it means that you end up with a data skew,
which causes long time to finalize the checkpoint, since different
subtasks take different time to finalize. If you don't have data skew (your
state is uniformly distributed) and you don't have a lot of backpressure
you shouldn't enable unaligned checkpoints IMO. So my suggestion is to
analyze your workload and try to eliminate any data skews you find.

Btw, what storage do you use for your RocksDB state? I suggest using the
fastest SSD you can get, e.g. in case of cloud vendors don't use something
like AWS EBS and use the ephemeral instance storage instead.



On Wed, Nov 16, 2022 at 4:49 AM Vishal Surana  wrote:

> Yes. I do use RocksDB for (incremental) checkpointing. During each
> checkpoint 15-20GB of state gets created (new state added, some expired). I
> make use of FIFO compaction.
>
> I’m a bit surprised you were able to run with 10+TB state without
> unaligned checkpoints because the performance in my application degrades
> quite a lot. Can you share your checkpoint configurations?
>
>
> Thanks,
> Vishal
> On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko ,
> wrote:
>
> Hi Vishal,
>
> Just wanted to comment on this bit:
>
> > My job has very large amount of state (>100GB) and I have no option but
> to use unaligned checkpoints.
>
> I successfully ran Flink jobs with 10+ TB of state and no unaligned
> checkpoints enabled. Usually, you consider enabling them when there is some
> kind of skew in the topology, but IMO it's unrelated to the state size.
>
> > Reducing the checkpoint interval is not really an option given the size
> of the checkpoint
>
> Do you use RocksDB state backend with incremental checkpointing?
>
> On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana 
> wrote:
>
>> I wanted to achieve exactly once semantics in my job and wanted to make
>> sure I understood the current behaviour correctly:
>>
>>1. Only one Kafka transaction at a time (no concurrent checkpoints)
>>2. Only one transaction per checkpoint
>>
>>
>> My job has very large amount of state (>100GB) and I have no option but
>> to use unaligned checkpoints. With the above limitation, it seems to me
>> that if checkpoint interval is 1 minute and checkpoint takes about 10
>> seconds to complete then only one Kafka transaction can happen in 70
>> seconds. All of the output records will not be visible until the
>> transaction completes. This way a steady stream of inputs will result in an
>> buffered output stream where data is only visible after a minute, thereby
>> destroying any sort of real time streaming use cases. Reducing the
>> checkpoint interval is not really an option given the size of the
>> checkpoint. Only way out would be to allow for multiple transactions per
>> checkpoint.
>>
>> Thanks,
>> Vishal
>>
>


Kubernetes operator and jobs with last-state upgrades

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

I am doing some tests with the operator and, if I'm not mistaken, using
last-state upgrade means that, when something is changed in the CR, no
savepoint is taken and the pods are simply terminated. Is that a
requirement from Flink HA? I would have thought last-state would still use
savepoints for upgrade if the current status is stable.

Regards,
Alexis.


Re: Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Ah I see, cool, thanks.

Regards,
Alexis.

Am Mi., 16. Nov. 2022 um 15:50 Uhr schrieb Gyula Fóra :

> This has been changed in the current snapshot release:
> https://issues.apache.org/jira/browse/FLINK-28979
>
> It will be part of the 1.3.0 version.
>
> On Wed, Nov 16, 2022 at 3:32 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a particular reason the operator doesn't set owner references
>> for the Deployments it creates as a result of a FlinkDeployment CR? This
>> makes tracking in the Argo CD UI impossible. (To be clear, I mean a
>> reference from the Deployment to the FlinkDeployment).
>>
>> Regards,
>> Alexis.
>>
>>


Re: Owner reference with the Kubernetes operator

2022-11-16 Thread Gyula Fóra
This has been changed in the current snapshot release:
https://issues.apache.org/jira/browse/FLINK-28979

It will be part of the 1.3.0 version.

On Wed, Nov 16, 2022 at 3:32 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> Is there a particular reason the operator doesn't set owner references for
> the Deployments it creates as a result of a FlinkDeployment CR? This makes
> tracking in the Argo CD UI impossible. (To be clear, I mean a reference
> from the Deployment to the FlinkDeployment).
>
> Regards,
> Alexis.
>
>


Savepoint restore mode for the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a recommended configuration for the restore mode of jobs managed
by the operator?

Since the documentation states that the operator keeps a savepoint history
to perform cleanup, I imagine restore mode should always be NO_CLAIM, but I
just want to confirm.

Regards,
Alexis.


Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a particular reason the operator doesn't set owner references for
the Deployments it creates as a result of a FlinkDeployment CR? This makes
tracking in the Argo CD UI impossible. (To be clear, I mean a reference
from the Deployment to the FlinkDeployment).

Regards,
Alexis.


Re: Kafka transactioins & flink checkpoints

2022-11-16 Thread Vishal Surana
Yes. I do use RocksDB for (incremental) checkpointing. During each checkpoint 
15-20GB of state gets created (new state added, some expired). I make use of 
FIFO compaction.

I’m a bit surprised you were able to run with 10+TB state without unaligned 
checkpoints because the performance in my application degrades quite a lot. Can 
you share your checkpoint configurations?


Thanks,
Vishal
On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko , 
wrote:
> Hi Vishal,
>
> Just wanted to comment on this bit:
>
> > My job has very large amount of state (>100GB) and I have no option but to 
> > use unaligned checkpoints.
>
> I successfully ran Flink jobs with 10+ TB of state and no unaligned 
> checkpoints enabled. Usually, you consider enabling them when there is some 
> kind of skew in the topology, but IMO it's unrelated to the state size.
>
> > Reducing the checkpoint interval is not really an option given the size of 
> > the checkpoint
>
> Do you use RocksDB state backend with incremental checkpointing?
>
> > On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana  wrote:
> > > I wanted to achieve exactly once semantics in my job and wanted to make 
> > > sure I understood the current behaviour correctly:
> > >
> > > 1. Only one Kafka transaction at a time (no concurrent checkpoints)
> > > 2. Only one transaction per checkpoint
> > >
> > >
> > > My job has very large amount of state (>100GB) and I have no option but 
> > > to use unaligned checkpoints. With the above limitation, it seems to me 
> > > that if checkpoint interval is 1 minute and checkpoint takes about 10 
> > > seconds to complete then only one Kafka transaction can happen in 70 
> > > seconds. All of the output records will not be visible until the 
> > > transaction completes. This way a steady stream of inputs will result in 
> > > an buffered output stream where data is only visible after a minute, 
> > > thereby destroying any sort of real time streaming use cases. Reducing 
> > > the checkpoint interval is not really an option given the size of the 
> > > checkpoint. Only way out would be to allow for multiple transactions per 
> > > checkpoint.
> > >
> > > Thanks,
> > > Vishal


Re: Kafka transactions drastically limit usability of Flink savepoints

2022-11-16 Thread Yordan Pavlov
Hi Piotr,

the option you mention is applicable only for the deprecated
KafkaProducer, is there an equivalent to the modern KafkaSink? I found
this article comparing the behavior of the two:
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs

it suggests that the default behavior of KafkaSink would be: "The
recovery continues with an ERROR message like the following is
logged:", however this is not what I observe, instead the job fails. I
am attaching the relevant part of the log. This error happens upon
trying to recover from a one month old savepoint.

Regards,
Yordan

On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski  wrote:
>
> Hi Yordan,
>
> I don't understand where the problem is, why do you think savepoints are 
> unusable? If you recover with `ignoreFailuresAfterTransactionTimeout` 
> enabled, the current Flink behaviour shouldn't cause any problems (except for 
> maybe some logged errors).
>
> Best,
> Piotrek
>
> wt., 15 lis 2022 o 15:36 Yordan Pavlov  napisał(a):
>>
>> Hi,
>> we are using Kafka savepoints as a recovery tool and want to store
>> multiple ones for the past months. However as we use Kafka
>> transactions for our KafkaSink this puts expiration time on our
>> savepoints. We can use a savepoint only as old as our Kafka
>> transaction timeout. The problem is explained in this issue:
>> https://issues.apache.org/jira/browse/FLINK-16419
>> the relative comment being this one:
>> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
>> have to recover and commit or if it has already happened. Due to that,
>> they are always attempting to recover and commit transactions during
>> startup."
>> I'm surprised that more people are not hitting this problem as this
>> makes Savepoints pretty much unusable as a recovery mechanism.
2022-11-16 10:01:07.168 [flink-akka.actor.default-dispatcher-13] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances aggreagation 
ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka 
realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer -> 
Save to Kafka daily ETH: Committer) (4/5) (6d4d91ab8657bba830695b9a011f7db6) 
switched from INITIALIZING to RUNNING.
2022-11-16 10:01:37.222 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 65436 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1668592897201 for job 
.
2022-11-16 10:01:39.082 [flink-akka.actor.default-dispatcher-13] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances aggreagation 
ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka 
realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer -> 
Save to Kafka daily ETH: Committer) (1/5) (cfaca46e7f4dc89629cdcaed5b48c059) 
switched from RUNNING to FAILED on 10.42.145.181:33297-efc328 @ 
eth-top-holders-v2-flink-taskmanager-0.eth-top-holders-v2-flink-taskmanager.flink.svc.cluster.local
 (dataPort=43125).
java.io.IOException: Could not perform checkpoint 65436 for operator Balances 
aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> 
Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily 
ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5)#0.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at 

Re: Reading Parquet file with array of structs cause error

2022-11-16 Thread Jing Ge
Hi Michael,

yeah, it will be addressed in Flink-28867.

Best regards,
Jing


On Wed, Nov 16, 2022 at 2:58 AM liu ron  wrote:

> It will be addressed in FLINK-28867.
>
> Best,
> Ron
>
> Benenson, Michael via user  于2022年11月16日周三 08:47写道:
>
>> Thanks, Jing
>>
>>
>>
>> Do you know, if this problem will be addressed in FLINK-28867
>>  or  it deserve a
>> separate Jira?
>>
>>
>>
>>
>>
>> *From: *Jing Ge 
>> *Date: *Tuesday, November 15, 2022 at 3:39 PM
>> *To: *Benenson, Michael 
>> *Cc: *user@flink.apache.org , Deshpande, Omkar <
>> omkar_deshpa...@intuit.com>, Vora, Jainik 
>> *Subject: *Re: Reading Parquet file with array of structs cause error
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Michael,
>>
>>
>>
>> Currently, ParquetColumnarRowInputFormat does not support schemas with
>> nested columns. If your parquet file stores Avro records. You might want to
>> try e.g. Avro Generic record[1].
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record
>>
>>
>>
>> Best regards,
>>
>> Jing
>>
>>
>>
>>
>>
>> On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <
>> user@flink.apache.org> wrote:
>>
>> Hi, folks
>>
>>
>>
>> I’m using flink 1.16.0, and I would like to read Parquet file (attached),
>> that has schema [1].
>>
>>
>>
>> I could read this file with Spark, but when I try to read it with Flink
>> 1.16.0 (program attached) using schema [2]
>>
>> I got IndexOutOfBoundsException [3]
>>
>>
>>
>> My code, and parquet file are attached. Is it:
>>
>> · the problem, described in FLINK-28867
>>  or
>>
>> · something new, that deserve a separate Jira, or
>>
>> · something wrong with my code?
>>
>>
>>
>> [1]: Parquet Schema
>>
>>
>>
>> root
>>
>> |-- amount: decimal(38,9) (nullable = true)
>>
>> |-- connectionAccountId: string (nullable = true)
>>
>> |-- sourceEntity: struct (nullable = true)
>>
>> ||-- extendedProperties: array (nullable = true)
>>
>> |||-- element: struct (containsNull = true)
>>
>> ||||-- key: string (nullable = true)
>>
>> ||||-- value: string (nullable = true)
>>
>> ||-- sourceAccountId: string (nullable = true)
>>
>> ||-- sourceEntityId: string (nullable = true)
>>
>> ||-- sourceEntityType: string (nullable = true)
>>
>> ||-- sourceSystem: string (nullable = true)
>>
>>
>>
>>
>>
>> [2]: Schema used in Flink:
>>
>>
>>
>> static RowType getSchema()
>>
>> {
>>
>> RowType elementType = RowType.of(
>>
>> new LogicalType[] {
>>
>> new VarCharType(VarCharType.MAX_LENGTH),
>>
>> new VarCharType(VarCharType.MAX_LENGTH)
>>
>> },
>>
>> new String[] {
>>
>> "key",
>>
>> "value"
>>
>> }
>>
>> );
>>
>>
>>
>> RowType element = RowType.of(
>>
>> new LogicalType[] { elementType },
>>
>> new String[] { "element" }
>>
>> );
>>
>>
>>
>> RowType sourceEntity = RowType.of(
>>
>> new LogicalType[] {
>>
>> new ArrayType(element),
>>
>> new VarCharType(),
>>
>> new VarCharType(),
>>
>> new VarCharType(),
>>
>> new VarCharType(),
>>
>> },
>>
>> new String[] {
>>
>> "extendedProperties",
>>
>> "sourceAccountId",
>>
>> "sourceEntityId",
>>
>> "sourceEntityType",
>>
>> "sourceSystem"
>>
>> }
>>
>> );
>>
>>
>>
>> return  RowType.of(
>>
>> new LogicalType[] {
>>
>> new DecimalType(),
>>
>> new VarCharType(),
>>
>> sourceEntity
>>
>> },
>>
>> new String[] {
>>
>> "amount",
>>
>> "connectionAccountId",
>>
>> "sourceEntity",
>>
>> });
>>
>> }
>>
>>
>>
>> [3]:  Execution Exception:
>>
>>
>> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager -
>> Received uncaught exception.
>>
>> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
>> exception while polling the records
>>
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>>
>> ...
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for
>> length 1
>>
>> at
>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>
>> at
>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>
>> at
>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>
>> at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>
>> at