Re: Force to commit kafka offset when stop a job.

2024-06-05 Thread Hang Ruan
Hi Lei.

I think you could try to use `stop with savepoint` to stop the job.
The offset will be committed when the checkpoint finished. So I think `stop
with savepoint` may be helpful.

Best,
Hang

Lei Wang  于2024年6月6日周四 01:16写道:

>
> When stopping a flink job that consuming kafka message, how to force it to
> commit kafka offset
>
> Thanks,
> Lei
>


Re: Flink job Deployement problem

2024-06-05 Thread Hang Ruan
Hi, Fokou Toukam.

This error occurs when the schema in the sink mismatches the schema you
provided from the upstream.
You may need to check whether the provided type of field `features` in sink
is the same as the type in the provided upstream.

Best,
Hang

Fokou Toukam, Thierry  于2024年6月6日周四
10:22写道:

> Hi, i'm trying to deploy flink job but i have this error. How to solve it
> please?
>
> *Thierry FOKOU *| * IT M.A.Sc  Student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
>
> [image: image001] 
>


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-26 Thread Hang Ruan
Hi, all.

Flink CDC provides the schema evolution ability to sync the entire
database. I think it could satisfy your needs.
Flink CDC pipeline sources and sinks are listed in [1]. Iceberg pipeline
connector is not provided by now.

> What is not is the automatic syncing of entire databases, with schema
evolution and detection of new (and dropped?) tables. :)

Flink CDC is able to sync the entire database with schema evolutions. If a
new table is added to this database, the running pipeline job cannot sync
it.
But we could enable 'scan.newly-added-table.enabled' and restart this job
with a savepoint to catch the new tables.
This feature for MySQL pipeline connector is not released now. But the
PR[2] has been provided.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/pipeline-connectors/overview/
[2] https://github.com/apache/flink-cdc/pull/3347

Xiqian YU  于2024年5月27日周一 10:04写道:

> Hi Otto,
>
>
>
> Flink CDC [1] now provides full-DB sync and schema evolution ability as a
> pipeline job. Iceberg sink support was suggested before, and we’re trying
> to implement this in the next few releases. Does it cover the use-cases you
> mentioned?
>
>
>
> [1] https://nightlies.apache.org/flink/flink-cdc-docs-stable/
>
> [2] https://issues.apache.org/jira/browse/FLINK-34840
>
>
>
> Regards,
>
> Xiqian
>
>
>
>
>
> *De : *Andrew Otto 
> *Date : *vendredi, 24 mai 2024 à 23:06
> *À : *Giannis Polyzos 
> *Cc : *Carlos Sanabria Miranda , Oscar
> Perez via user , Péter Váry <
> peter.vary.apa...@gmail.com>, mbala...@apache.org 
> *Objet : *Re: "Self-service ingestion pipelines with evolving schema via
> Flink and Iceberg" presentation recording from Flink Forward Seattle 2023
>
> Indeed, using Flink-CDC to write to Flink Sink Tables, including Iceberg,
> is supported.
>
>
>
> What is not is the automatic syncing of entire databases, with schema
> evolution and detection of new (and dropped?) tables.  :)
>
>
>
>
>
>
>
>
>
> On Fri, May 24, 2024 at 8:58 AM Giannis Polyzos 
> wrote:
>
> https://nightlies.apache.org/flink/flink-cdc-docs-stable/
>
> All these features come from Flink cdc itself. Because Paimon and Flink
> cdc are projects native to Flink there is a strong integration between them.
>
> (I believe it’s on the roadmap to support iceberg as well)
>
>
>
> On Fri, 24 May 2024 at 3:52 PM, Andrew Otto  wrote:
>
> > I’m curious if there is any reason for choosing Iceberg instead of Paimon
>
>
> No technical reason that I'm aware of.  We are using it mostly because of
> momentum.  We looked at Flink Table Store (before it was Paimon), but
> decided it was too early and the docs were too sparse at the time to really
> consider it.
>
>
>
> > Especially for a use case like CDC that iceberg struggles to support.
>
>
>
> We aren't doing any CDC right now (for many reasons), but I have never
> seen a feature like Paimon's database sync before.  One job to sync and
> evolve an entire database?  That is amazing.
>
>
>
> If we could do this with Iceberg, we might be able to make an argument to
> product managers to push for CDC.
>
>
>
>
>
>
>
> On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
> wrote:
>
> I’m curious if there is any reason for choosing Iceberg instead of Paimon
> (other than - iceberg is more popular).
>
> Especially for a use case like CDC that iceberg struggles to support.
>
>
>
> On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:
>
> Interesting thank you!
>
>
>
> I asked this in the Paimon users group:
>
>
>
> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
> RichCdcMultiplexRecord
> 
>  and
> related code seem incredibly useful even outside of the context of the
> Paimon table format.
>
>
>
> I'm asking because the database sync action
> 
>  feature
> is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
> Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
> Paimon and abstract the Sink bits.
>
>
>
> Could the table/database sync with schema evolution (without Flink job
> restarts!) potentially work with the Iceberg sink?
>
>
>
>
>
>
>
>
>
> On Thu, May 23, 2024 at 4:34 PM Péter Váry 
> wrote:
>
> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the wire
> which contain not only the data, but the schema as well.
>
> With Iceberg we currently only send the row data, and expect to receive
> the schema on job start - this is more performant than sending the schema
> all the time, but has the obvious issue that it is not able to handle the
> schema changes. Another part of the dynamic schema synchronization is the
> update of the Iceberg table schema - the schema should be updated for all
> of the writers and the 

Re: Email submission

2024-05-20 Thread Hang Ruan
Hi, Michas.

Please subscribe to the mailing list by sending an email to
user-subscr...@flink.apache.org .

Best,
Hang

Michas Szacillo (BLOOMBERG/ 919 3RD A) 
于2024年5月19日周日 04:34写道:

> Sending my email to join the apache user mailing list.
>
> Email: mszaci...@bloomberg.net
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Hang Ruan
Congratulations!

Thanks for the great work.

Best,
Hang

Qingsheng Ren  于2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Hang Ruan
Congratulations!

Thanks for the great work.

Best,
Hang

Qingsheng Ren  于2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [EXTERNAL]Re: Flink kafka connector for v 1.19.0

2024-05-16 Thread Hang Ruan
Hi, Niklas.

The kafka connector version 3.2.0[1] is for Flink 1.19 and it has a vote
thread[2] already. But there is not enough votes,

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-35138
[2] https://lists.apache.org/thread/7shs2wzb0jkfdyst3mh6d9pn3z1bo93c

Niklas Wilcke  于2024年5月16日周四 22:04写道:

> Hi Ahmed,
>
> are you aware of a blocker? I'm also a bit confused that after Flink 1.19
> being available for a month now the connectors still aren't. It would be
> great to get some insights or maybe a reference to an issue. From looking
> at the Github repos and the Jira I wasn't able to spot something obvious
> telling me that this matter is really in the focus. Thank you!
>
> Regards,
> Niklas
>
>
> On 10. May 2024, at 20:10, Ahmed Hamdy  wrote:
>
> Hi Aniket
>
> The community is currently working on releasing a new version for all the
> connectors that is compatible with 1.19. Please follow the announcements in
> Flink website[1] to get notified when it is available.
>
> 1-https://flink.apache.org/posts/
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 10 May 2024 at 18:14, Aniket Sule 
> wrote:
>
>> Hello,
>>
>> On the Flink downloads page, the latest stable version is Flink 1.19.0.
>> However, the Flink Kafka connector is v 3.1.0, that is compatible with
>> 1.18.x.
>>
>> Is there a timeline when the Kafka connector for v 1.19 will be released?
>> Is it possible to use the v3.1.0 connector with Flink v 1.19?
>>
>>
>>
>> Thanks and regards,
>>
>> Aniket Sule
>> Caution: External email. Do not click or open attachments unless you know
>> and trust the sender.
>>
>
>


Re: monitoring message latency for flink sql app

2024-05-16 Thread Hang Ruan
Hi, mete.

As Feng Jin said, I think you could make use of the metric `
currentEmitEventTimeLag`.
Besides that, if you develop your job with the DataStream API, you could
add a new operator to handle it by yourself.

Best,
Hang

Feng Jin  于2024年5月17日周五 02:44写道:

> Hi Mete
>
> You can refer to the metrics provided by the Kafka source connector.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka//#monitoring
>
> Best,
> Feng
>
> On Thu, May 16, 2024 at 7:55 PM mete  wrote:
>
>> Hello,
>>
>> For an sql application using kafka as source (and kafka as sink) what
>> would be the recommended way to monitor for processing delay? For example,
>> i want to be able to alert if the app has a certain delay compared to some
>> event time field in the message.
>>
>> Best,
>> Mete
>>
>>
>>


Re: 退订

2024-05-11 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh@flink.apache.org 
邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

爱看书不识字  于2024年5月11日周六 10:06写道:

> 退订


Re: Incremental snapshot based source

2024-05-08 Thread Hang Ruan
Hi. cloud young.

The property 'log.mining.strategy' is actually a setting from the oracle
debezium connector[1].
Are some exceptions able to be found in your job and which version are you
using?

Best,
Hang

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-property-log-mining-strategy

cloud young  于2024年5月8日周三 16:35写道:

> I have read the doc ncremental-snapshot-based-datastream-experimental
> 
>  ,
> and run the example code. It works. but I also need to get the ddl changes,
> so I change the debeziumProperties.setProperty("log.mining.strategy",
> "online_catalog");
> to debeziumProperties.setProperty("log.mining.strategy",
> "redo_log_catalog");
> after I run the modified code, there is nothing print in the console.
> I set a breakpoint at JsonDebeziumDeserializationSchema#deserialize
> method, the method not invoked at all.
> why? how to resolve this?
>


Re: Elasticsearch8 example

2024-04-17 Thread Hang Ruan
Hi Tauseef.

I see that the support of Elasticsearch 8[1] will be released
in elasticsearch-3.1.0. So there is no docs for the elasticsearch8 by now.
We could learn to use it by some tests[2] before the docs is ready.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-26088
[2]
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java

Tauseef Janvekar  于2024年4月17日周三 01:12写道:

> Dear Team,
>
> Can anyone please share an example for flink-connector-elasticsearch8
> I found this connector being added to the github. But no proper
> documentation is present around it.
>
> It will be of great help if a sample code is provided on the above
> connector.
>
> Thanks,
> Tauseef
>


Re: Table Source from Parquet Bug

2024-04-17 Thread Hang Ruan
Hi, David.

Have you added the parquet format[1] dependency in your dependencies?
It seems that the class ParquetColumnarRowInputFormat cannot be found.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/parquet/

Sohil Shah  于2024年4月17日周三 04:42写道:

> Hello David,
>
> Since this is a ClassNotFoundException, you maybe missing a dependency.
> Could you share your pom.xml.
>
> Thanks
> -Sohil
> Project: Braineous https://bugsbunnyshah.github.io/braineous/
>
> On Tue, Apr 16, 2024 at 11:25 AM David Silva via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> Our team would like to leverage Flink but we're running into some issues
>> with reading from a parquet file source. I *think* it's an issue with
>> the Flink API
>>
>> Could someone please help take a look?
>>
>> We're using *Scala 2.12* & *Flink 1.18.1*
>>
>> I attached a copy of the code, the terminal output, and the flink logs.
>>
>> The issue is @ *MacFlapAggregator.scala:324*, it errors because of:
>> *Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat*
>>
>>
>>- *MacFlapAggregator.scala:206 *creates and queries the same exact
>>table successfully though
>>- *MacFlapAggregator.scala:318 *If I create the table using a CSV
>>source, it works
>>
>>
>> I also posted in the slack server here
>> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1713209215085589
>>
>> Any help with this would be immensely helpful, our team has been
>> struggling with this for a couple days now.
>>
>> Thanks!
>>
>


Re: How to handle tuple keys with null values

2024-04-02 Thread Hang Ruan
Hi Sachin.

I think maybe we could cast the Long as String to handle the null value. Or
as Asimansu said, try to filter out the null data.

Best,
Hang

Asimansu Bera  于2024年4月3日周三 08:35写道:

> Hello Sachin,
>
> The same issue had been reported in the past and JIRA was closed without
> resolution.
>
> https://issues.apache.org/jira/browse/FLINK-4823
>
> I do see this is as a data quality issue. You need to understand what you
> would like to do with the null value. Either way, better to filter out the
> null data earlier so that you may not necessary manage the null or you may
> also try using POJO as POJO might support null.
>
> Sincerely,
> -A
>
>
> On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal  wrote:
>
>> Hello folks,
>> I am keying my stream using a Tuple:
>>
>> example:
>>
>> public class MyKeySelector implements KeySelector> {
>>
>> @Override
>> public Tuple2 getKey(Data data) {
>>   return Tuple2.of(data.id, data.id1);
>> }
>>
>> }
>>
>> Now id1 can have null values. In this case how should I handle this?
>>
>> Right now I am getting this error:
>>
>> java.lang.RuntimeException: Exception occurred while setting the current key 
>> context.
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
>> expected to hold a value.
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>>  

Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Hang Ruan
Congrattulations!

Best,
Hang

Lincoln Lee  于2024年3月21日周四 09:54写道:

>
> Congrats, thanks for the great work!
>
>
> Best,
> Lincoln Lee
>
>
> Peter Huang  于2024年3月20日周三 22:48写道:
>
>> Congratulations
>>
>>
>> Best Regards
>> Peter Huang
>>
>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:
>>
>>>
>>> Congratulations
>>>
>>>
>>>
>>> Best,
>>> Huajie Wang
>>>
>>>
>>>
>>> Leonard Xu  于2024年3月20日周三 21:36写道:
>>>
>>>> Hi devs and users,
>>>>
>>>> We are thrilled to announce that the donation of Flink CDC as a
>>>> sub-project of Apache Flink has completed. We invite you to explore the new
>>>> resources available:
>>>>
>>>> - GitHub Repository: https://github.com/apache/flink-cdc
>>>> - Flink CDC Documentation:
>>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>>>
>>>> After Flink community accepted this donation[1], we have completed
>>>> software copyright signing, code repo migration, code cleanup, website
>>>> migration, CI migration and github issues migration etc.
>>>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>>>> Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors
>>>> for their contributions and help during this process!
>>>>
>>>>
>>>> For all previous contributors: The contribution process has slightly
>>>> changed to align with the main Flink project. To report bugs or suggest new
>>>> features, please open tickets
>>>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>>>> longer accept GitHub issues for these purposes.
>>>>
>>>>
>>>> Welcome to explore the new repository and documentation. Your feedback
>>>> and contributions are invaluable as we continue to improve Flink CDC.
>>>>
>>>> Thanks everyone for your support and happy exploring Flink CDC!
>>>>
>>>> Best,
>>>> Leonard
>>>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>>>
>>>>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Hang Ruan
Congrattulations!

Best,
Hang

Lincoln Lee  于2024年3月21日周四 09:54写道:

>
> Congrats, thanks for the great work!
>
>
> Best,
> Lincoln Lee
>
>
> Peter Huang  于2024年3月20日周三 22:48写道:
>
>> Congratulations
>>
>>
>> Best Regards
>> Peter Huang
>>
>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:
>>
>>>
>>> Congratulations
>>>
>>>
>>>
>>> Best,
>>> Huajie Wang
>>>
>>>
>>>
>>> Leonard Xu  于2024年3月20日周三 21:36写道:
>>>
>>>> Hi devs and users,
>>>>
>>>> We are thrilled to announce that the donation of Flink CDC as a
>>>> sub-project of Apache Flink has completed. We invite you to explore the new
>>>> resources available:
>>>>
>>>> - GitHub Repository: https://github.com/apache/flink-cdc
>>>> - Flink CDC Documentation:
>>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>>>
>>>> After Flink community accepted this donation[1], we have completed
>>>> software copyright signing, code repo migration, code cleanup, website
>>>> migration, CI migration and github issues migration etc.
>>>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>>>> Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors
>>>> for their contributions and help during this process!
>>>>
>>>>
>>>> For all previous contributors: The contribution process has slightly
>>>> changed to align with the main Flink project. To report bugs or suggest new
>>>> features, please open tickets
>>>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>>>> longer accept GitHub issues for these purposes.
>>>>
>>>>
>>>> Welcome to explore the new repository and documentation. Your feedback
>>>> and contributions are invaluable as we continue to improve Flink CDC.
>>>>
>>>> Thanks everyone for your support and happy exploring Flink CDC!
>>>>
>>>> Best,
>>>> Leonard
>>>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>>>
>>>>


Re: can we use Scan Newly Added Tables without restarting the existing job ?

2024-03-20 Thread Hang Ruan
Hi, 3pang zhu.

This `Scan Newly added tables` feature requires restarting the job from the
savepoint. We cannot add new tables to the running job without restarting
by now.

Best,
Hang

3pang zhu  于2024年3月20日周三 15:22写道:

> this link has describe the usage for [Scan Newly Added Tables]
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
> .
> if we can use if without restarting job. i have try this patch, use a
> schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
> than twice, it occur this issue
> https://github.com/apache/flink-cdc/issues/2282
>
>
>  
> .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>  | 26 +++---
>  
> .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
>  |  5 +++--
>  2 files changed, 26 insertions(+), 5 deletions(-)
>
> diff --cc
> flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
> index 0536a262,0536a262..d52acc26
> ---
> a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
> +++
> b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
> @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
>   import java.util.concurrent.CopyOnWriteArrayList;
>   import java.util.concurrent.ExecutorService;
>   import java.util.concurrent.Executors;
> ++import java.util.concurrent.ScheduledExecutorService;
>   import java.util.concurrent.ThreadFactory;
> ++import java.util.concurrent.TimeUnit;
>   import java.util.stream.Collectors;
>
>   import static
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
> @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
>   private MySqlChunkSplitter chunkSplitter;
>   private boolean isTableIdCaseSensitive;
>   private ExecutorService executor;
> ++private ScheduledExecutorService scheduledExecutor;
>
>   @Nullable private Long checkpointIdToFinish;
>
> @@@ -179,12 -179,12 +182,24 @@@
>   @Override
>   public void open() {
>   chunkSplitter.open();
> --discoveryCaptureTables();
> --captureNewlyAddedTables();
> --startAsynchronouslySplit();
> ++if (scheduledExecutor == null) {
> ++ThreadFactory threadFactory =
> ++new
> ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
> ++this.scheduledExecutor =
> Executors.newSingleThreadScheduledExecutor(threadFactory);
> ++}
> ++scheduledExecutor.scheduleAtFixedRate(
> ++() -> {
> ++discoveryCaptureTables();
> ++captureNewlyAddedTables();
> ++startAsynchronouslySplit();
> ++},
> ++0,
> ++1,
> ++TimeUnit.MINUTES);
>   }
>
>   private void discoveryCaptureTables() {
> ++LOG.info("start discovery capture tables");
>   // discovery the tables lazily
>   if (needToDiscoveryTables()) {
>   long start = System.currentTimeMillis();
> @@@ -216,6 -216,6 +231,7 @@@
>   }
>
>   private void captureNewlyAddedTables() {
> ++LOG.info("start to capture newly added tables");
>   if (sourceConfig.isScanNewlyAddedTableEnabled()) {
>   // check whether we got newly added tables
>   try (JdbcConnection jdbc = openJdbcConnection(sourceConfig))
> {
> @@@ -282,6 -282,6 +298,7 @@@
>   }
>
>   private void startAsynchronouslySplit() {
> ++LOG.info("start asynchronously split");
>   if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
>   if (executor == null) {
>   ThreadFactory threadFactory =
> @@@ -497,6 -497,6 +514,9 @@@
>   if (executor != null) {
>   executor.shutdown();
>   }
> ++if (scheduledExecutor != null) {
> ++scheduledExecutor.shutdown();
> ++}
>   }
>


Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Hang Ruan
Hi, Sachin.

I use the command `jar -tf flink-dist-1.18.0.jar| grep OutputTag` to make
sure that this class is packaged correctly.
I think you should check your own jar to make sure this class is not
packaged in your jar.

Best,
Hang

Sachin Mittal  于2024年3月12日周二 20:29写道:

> I miss wrote.  It’s version 1.18.
>
> This is latest and works locally but not on aws emr and I get class not
> found exception.
>
>
>
> On Tue, 12 Mar 2024 at 1:25 PM, Zhanghao Chen 
> wrote:
>
>> Hi Sachin,
>>
>> Flink 1.8 series have already been out of support, have you tried with a
>> newer version of Flink?
>> --
>> *From:* Sachin Mittal 
>> *Sent:* Tuesday, March 12, 2024 14:48
>> *To:* user@flink.apache.org 
>> *Subject:* Facing ClassNotFoundException:
>> org.apache.flink.api.common.ExecutionConfig on EMR
>>
>> Hi,
>> We have installed a flink cluster version 1.8.0 on AWS EMR.
>> However when we submit a job we get the following error:
>>
>> (Do note that when we submit the same job on a local instance of Flink
>> 1.8.1 it is working fine.
>> The fat jar we submit has all the flink dependencies from 1.8.0 including
>> the class org.apache.flink.api.common.ExecutionConfig).
>>
>> Caused by: java.lang.RuntimeException: 
>> org.apache.flink.runtime.client.JobInitializationException: Could not start 
>> the JobMaster.
>>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>  at 
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>>  at 
>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>  at 
>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>  at 
>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
>> not start the JobMaster.
>>  at 
>> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>  at java.base/java.lang.Thread.run(Thread.java:840)
>> Caused by: java.util.concurrent.CompletionException: 
>> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>>  ... 3 more
>> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>>  at 
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>>  ... 3 more
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.common.ExecutionConfig
>>  at 
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>>  at 
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>>  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
>>  at java.base/java.lang.Class.forName0(Native Method)
>>  at java.base/java.lang.Class.forName(Class.java:467)
>>
>>


Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Hang Ruan
Hi, Sachin.

This error occurs when there is class conflict. There is no need to package
flink-core in your own jar. It is already contained in flink-dist.
And Flink version 1.8 is too old. It is better to update your flink version.

Best,
Hang



Sachin Mittal  于2024年3月12日周二 16:04写道:

> Hi,
> We have installed a flink cluster version 1.8.0 on AWS EMR.
> However when we submit a job we get the following error:
>
> (Do note that when we submit the same job on a local instance of Flink
> 1.8.1 it is working fine.
> The fat jar we submit has all the flink dependencies from 1.8.0 including
> the class org.apache.flink.api.common.ExecutionConfig).
>
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
>   at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>   at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>   at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>   ... 3 more
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>   ... 3 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.api.common.ExecutionConfig
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
>   at java.base/java.lang.Class.forName0(Native Method)
>   at java.base/java.lang.Class.forName(Class.java:467)
>
>


Re: 退订

2024-03-12 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

willluzheng  于2024年3月12日周二 14:04写道:

> 退订
>  回复的原邮件 
> | 发件人 | 王阳 |
> | 发送日期 | 2024年03月12日 13:49 |
> | 收件人 | user-zh@flink.apache.org |
> | 主题 | 退订 |
> 退订
>
>


Re: 退订

2024-03-12 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh@flink.apache.org 
邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

熊柱 <18428358...@163.com> 于2024年3月12日周二 11:43写道:

> 退订


Re: 退订

2024-03-10 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

王新隆  于2024年3月11日周一 10:18写道:

> 退订


Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-03-10 Thread Hang Ruan
Hi, Xuyang & Daniel.

I have checked this part of code. I think it is an expected behavior.
As marked in code comments, this loop makes sure that the transactions
before this checkpoint id are re-created.

The situation Daniel mentioned will happen only when all checkpoint between
1 and 2 fails. If so, we should check why these checkpoints failed.
The transaction producer will be used when the DeliveryGuarantee is
EXACTLY_ONCE. If other DeliveryGuarantee is accepted, you could use other
DeliveryGuarantee to skip it.

I think it is better to check whether there are many checkpoints failed,
and check the flame graph to make sure this code caused the busyness.

Best,
Hang

Xuyang  于2024年3月11日周一 09:58写道:

> Hi, Danny.
> When the problem occurs, can you use flame graph to confirm whether the
> loop in this code is causing the busyness?
> Since I'm not particularly familiar with kafka connector, I can't give you
> an accurate reply. I think Hang Ruan is an expert in this field :).
>
> Hi, Ruan Hang. Can you take a look at this strange situation?
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-10 16:49:16,"Daniel Peled"  写道:
>
> Hello,
>
> I am sorry I am addressing you personally.
> I have tried sending the request in the user group and got no response
>
> If you can't help me please let me know
> And please tell me who can help up
>
> The problem is as followed:
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator is 100% busy
> for several minutes and *even 1/2-1 hour* !!!
>
> The problematic code seems to be the following for-loop in
> getTransactionalProducer() method:
>
>
> *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
>
> private FlinkKafkaInternalProducer
> getTransactionalProducer(long checkpointId) {
> checkState(
> checkpointId > lastCheckpointId,
> "Expected %s > %s",
> checkpointId,
> lastCheckpointId);
> FlinkKafkaInternalProducer producer = null;
> // in case checkpoints have been aborted, Flink would create
> non-consecutive transaction ids
> // this loop ensures that all gaps are filled with initialized
> (empty) transactions
>
>
>
>
>
> * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
> String transactionalId =
> TransactionalIdFactory.buildTransactionalId(
> transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
>   producer = getOrCreateTransactionalProducer(transactionalId);
> }*
> this.lastCheckpointId = checkpointId;
> assert producer != null;
> LOG.info("Created new transactional producer {}",
> producer.getTransactionalId());
> return producer;
> }
>
>
> Since we added a new sink operator the lastCheckpointId is 1,
> And if for example the checkpointId is 2,
> The loop will be executed for 2 times !!!
>
>
> We have several questions:
> 1. Is this behaviour expected ?
> 2. Are we doing something wrong ?
> 3. Is there a way to avoid this behavior ?
>
> Best Regards,
> Danny
>
>


Re: SecurityManager in Flink

2024-03-06 Thread Hang Ruan
Hi, Kirti.

Could you please provide the stack trace of this NPE? I check the code and
I think maybe the problem lies in LocalFileSystem#listStatus.
The code in line 161[1] may return null, which will let
LocalFileSystem#listStatus return null. Then the `containedFiles` is null
and the NPE occurs.
I think we should add code to handle this situation as follows.

```
final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
if (containedFiles == null) {
throw new FlinkRuntimeException("Cannot list files under " +
fileStatus.getPath());
}
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Best,
Hang

[1]
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:

> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it
> is returning the list of files as null instead of throwing permission
> exception (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target);
> }
>
> Debugging the issue found that, SecurityManager is coming as null while
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>


Re: 退订

2024-03-03 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

4kings...@gmail.com <4kings...@gmail.com> 于2024年3月2日周六 19:24写道:

> 退订
> 4kings...@gmail.com
> 邮箱:4kings...@gmail.com


Re: flink cdc底层的debezium是如何注册schema到confluent schema registry的?

2024-02-29 Thread Hang Ruan
Hi,casel.chen。

这个部分应该是在 CDC 项目里没有涉及到,CDC 依赖 debezium 的 engine 部分直接读取出变更数据,并没有像 debezium
本身一样去写入到 Kafka 中。
可以考虑去 Debezium 社区咨询一下这部分的内容,Debezium开发者们应该更熟悉这部分的内容。

祝好,
Hang

casel.chen  于2024年2月29日周四 18:11写道:

> 搜索了debezium源码但没有发现哪里有调用
> SchemaRegistryClient.register方法的地方,请问它是如何注册schema到confluent schema
> registry的?


Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-02-29 Thread Hang Ruan
你好,ha.fengqi。

MySQL CDC
连接器只有在多并发时,会依赖checkpoint的完成来切换到增量阶段。从你提供的代码上来看,是单并发的运行作业,所以应该Source
在这两者之间的行为不会有区别。
这个不同是不是有可能是下游在两种使用方式上,有什么区别?
可以通过观察具体的IO指标看到Source是否真的及时发出消息,如果比较熟悉代码,也可以自己添加一下打印日志来验证。

祝好,
Hang


Re: DESCRIBE CATALOG not available?

2024-01-28 Thread Hang Ruan
Hi, Robin.

I see that the `DESCRIBE CATALOG` sql is not list in the DESCRIBE
document[1]. It is not available.
Besides this, I checked the changes in Catalog.java from commits on May 9,
2019. I cannot find the method `explainCatalog` introduced from this FLIP.
This FLIP is not finished yet.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/describe/

Robin Moffatt via user  于2024年1月27日周六 01:01写道:

> In FLIP-69 [1] it looks like DESCRIBE CATALOG was added, but when I
> try it from SQL Client in 1.18.1 it doesn't seem to work
>
> Flink SQL> SHOW CATALOGS;
> +-+
> |catalog name |
> +-+
> |   c_new |
> | default_catalog |
> +-+
> 2 rows in set
>
> Flink SQL> DESCRIBE CATALOG default_catalog;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'default_catalog' not found in any table
>
> Poking around I found the JIRA that implemented it [2] and can see the
> code present today [3]
>
> Any ideas?
>
> Thanks, Robin.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement
> [2] https://issues.apache.org/jira/browse/FLINK-14689
> [3]
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeCatalog.java
>


Re: flink cdc 读取数据后面可以跟窗口函数吗

2024-01-17 Thread Hang Ruan
你好,

CDC Source 目前不支持窗口函数。

不过可以考虑通过非窗口聚合的方式实现类似的效果。具体方法为:

   1.

   使用DATE_FORMAT函数,将时间字段转换成分钟粒度的字符串,作为窗口值。
   2.

   根据窗口值进行GROUP BY聚合。

Best,
Hang

Xuyang  于2024年1月17日周三 19:34写道:

> Hi,
> Flink SQL中可以用Group Window[1]的方式来读完cdc数据后加窗口。
> 可以具体描述一下“一直不生效”的现象和SQL么?
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#selecting-group-window-start-and-end-timestamps-1
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2024-01-17 19:24:03,"2813732510" <2813732...@qq.com.INVALID> 写道:
> >flinkcdc读取binlog数据后面可以开窗吗,测试滑动窗口,聚合,一直不生效,是有什么特别的用法嘛
>


Re: 退订

2024-01-15 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

yd c  于2024年1月15日周一 08:47写道:

> 退订


Re: flink-connector-dynamodb 4.2.0-1.18 does not provide flink-connector-base dependency

2024-01-11 Thread Hang Ruan
Hi, Tamir.

This is an expected behavior. The flink-connector-base is already included
in flink-dist and we will not package it in the externalized connectors.
You could see this issue[1] for more details.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-30400?filter=-1

Tamir Sagi  于2024年1月11日周四 23:09写道:

> Hi
>
> I updated dynamodb connector to 4.2.0-1.18 but it does not provide
> flink-connector-base​ dependency where in 4.1.0-1.17 it does.[1]
>
> it appears it its pom's definition only as test-jar in scope test
>
> I'm working with custom
> #org.apache.flink.connector.base.sink.writer.ElementConverter which are not
> available without bringing such dependency into my project.
> 4.2.0-1.18 provides  flink-connector-aws-base but flink-connector-base is
> in scope provided [2] [3]
>
> is that a mistake or intentional?
>
> Thanks,
> Tamir
>
>
> [1]
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-dynamodb/4.1.0-1.17
> [2]
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-dynamodb/4.2.0-1.18
> [3]
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-aws-base/4.2.0-1.18
> ​
>
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Re: Flink CDC MySqlSplitReader问题

2023-12-24 Thread Hang Ruan
Hi,

我记得这段逻辑是为了保证在新增表后,binlog 读取能和新增表的快照读取一起进行,保证binlog读取不会中断。
这里应该是会先读binlog,然后再读snapshot,再是binlog。这样的切换,来保证binlog 能一直有数据读出来。

Best,
Hang

casel.chen  于2023年12月22日周五 10:44写道:

> 那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-12-20 21:40:05,"Hang Ruan"  写道:
> >Hi,casel
> >
> >这段逻辑应该只有在处理到新增表的时候才会用到。
> >CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
> >但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。
> >
> >Best,
> >Hang
> >
> >
> >key lou  于2023年12月20日周三 16:24写道:
> >
> >> 意思是当 有 binlog  就意味着 已经读完了 snapshot
> >>
> >> casel.chen  于2023年12月19日周二 16:45写道:
> >>
> >> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
> >> >
> >> >
> >> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then
> read
> >> > snapshot split”这一句话我不理解。
> >> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> >> > split再读增量的binlog split么?
> >> >
> >> >
> >> > private MySqlRecords pollSplitRecords() throws InterruptedException {
> >> > Iterator dataIt;
> >> > if (currentReader == null) {
> >> > // (1) Reads binlog split firstly and then read snapshot
> >> split
> >> > if (binlogSplits.size() > 0) {
> >> > // the binlog split may come from:
> >> > // (a) the initial binlog split
> >> > // (b) added back binlog-split in newly added table
> >> process
> >> > MySqlSplit nextSplit = binlogSplits.poll();
> >> > currentSplitId = nextSplit.splitId();
> >> > currentReader = getBinlogSplitReader();
> >> > currentReader.submitSplit(nextSplit);
> >> > } else if (snapshotSplits.size() > 0) {
> >> > MySqlSplit nextSplit = snapshotSplits.poll();
> >> > currentSplitId = nextSplit.splitId();
> >> > currentReader = getSnapshotSplitReader();
> >> > currentReader.submitSplit(nextSplit);
> >> > } else {
> >> > LOG.info("No available split to read.");
> >> > }
> >> > dataIt = currentReader.pollSplitRecords();
> >> > return dataIt == null ? finishedSplit() :
> forRecords(dataIt);
> >> > } else if (currentReader instanceof SnapshotSplitReader) {
> >> >   
> >> > }
> >> > ...
> >> > }
> >>
>


Re: Flink CDC MySqlSplitReader问题

2023-12-20 Thread Hang Ruan
Hi,casel

这段逻辑应该只有在处理到新增表的时候才会用到。
CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。

Best,
Hang


key lou  于2023年12月20日周三 16:24写道:

> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>
> casel.chen  于2023年12月19日周二 16:45写道:
>
> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
> >
> >
> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
> > snapshot split”这一句话我不理解。
> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> > split再读增量的binlog split么?
> >
> >
> > private MySqlRecords pollSplitRecords() throws InterruptedException {
> > Iterator dataIt;
> > if (currentReader == null) {
> > // (1) Reads binlog split firstly and then read snapshot
> split
> > if (binlogSplits.size() > 0) {
> > // the binlog split may come from:
> > // (a) the initial binlog split
> > // (b) added back binlog-split in newly added table
> process
> > MySqlSplit nextSplit = binlogSplits.poll();
> > currentSplitId = nextSplit.splitId();
> > currentReader = getBinlogSplitReader();
> > currentReader.submitSplit(nextSplit);
> > } else if (snapshotSplits.size() > 0) {
> > MySqlSplit nextSplit = snapshotSplits.poll();
> > currentSplitId = nextSplit.splitId();
> > currentReader = getSnapshotSplitReader();
> > currentReader.submitSplit(nextSplit);
> > } else {
> > LOG.info("No available split to read.");
> > }
> > dataIt = currentReader.pollSplitRecords();
> > return dataIt == null ? finishedSplit() : forRecords(dataIt);
> > } else if (currentReader instanceof SnapshotSplitReader) {
> >   
> > }
> > ...
> > }
>


Re: 退订

2023-12-18 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from u...@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
u...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

唐大彪  于2023年12月18日周一 23:44写道:

> 退订
>


Re: 退订

2023-12-18 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

唐大彪  于2023年12月18日周一 23:44写道:

> 退订
>


Re: Flink KafkaProducer Failed Transaction Stalling the whole flow

2023-12-18 Thread Hang Ruan
Hi, Dominik.

>From the code, your sink has received an InvalidTxnStateException in
KafkaCommittter[1]. And kafka connector treats it as a known exception to
invoke `signalFailedWithKnownReason`.
`signalFailedWithKnownReason` will not throw an exception. It let the
committer to decide fail or retry.
Kafka sink does not throw an exception after `signalFailedWithKnownReason`.
So the job will not fail by this InvalidTxnStateException.

I am not sure why the known exception is ignored in kafka sink.

Best,
Hang

[1]
https://github.com/apache/flink-connector-kafka/blob/825052f55754e401176083c121ffaf38362b7a26/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java#L96C22-L96C46


Re: Is the kafka-connector doc missing a dependency on flink-connector-base

2023-12-05 Thread Hang Ruan
Hi, Jean-Marc Paulin.

The flink-connector-base will not be packaged in the externalized
connectors [1].
The flink-connector-base has been included in flink-dist and we should use
the provided scope in maven for it.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-30400

Jean-Marc Paulin  于2023年12月5日周二 01:12写道:

> Hi,
>
> Trying to update the kafka connector to my project and I am missing a
> class. Is the doc missing a dependency on flink-connector-base ?
>
> 
>   org.apache.flink
>   flink-connector-base
>   compile
> 
>
> I added it and it works. I think that's required but I would have expected
> this in the dependency on the  Kafka | Apache Flink
> 
>  page.
>
> Thanks
>
> JM
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: Elasticsearch connector support?

2023-11-27 Thread Hang Ruan
Hi, Lasse.

There is already a discussion about the connector releases for 1.18[1].

Best,
Hang

[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2

Lasse Nedergaard  于2023年11月24日周五 22:57写道:

> Hi
>
> From the documentation I can see there isn’t any ES support in Flink 1.18
> right now and Flink-26088 (ES 8 support) is still open.
>
> Does anyone has an idea when ES connector support will be available in 1.18
>
> Please let me know.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: Confluent Kafka conection error

2023-11-23 Thread Hang Ruan
Hi, Tauseef.

This error is not that you can not access the Kafka cluster. Actually, this
error means that the JM cannot access its TM.
Have you ever checked whether the JM is able to access the TM?

Best,
Hang

Tauseef Janvekar  于2023年11月23日周四 16:04写道:

> Dear Team,
>
> We are facing the below issue while connecting to confluent kafka
> Can someone please help here.
>
> 2023-11-23 06:09:36,989 INFO  org.apache.flink.runtime.executiongraph.
> ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1
> /1) (496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2023-11-23 06:09:36,994 INFO  org.apache.flink.runtime.executiongraph.
> ExecutionGraph   [] - Deploying Source: src_source -> Sink: Print to
> Std. Out (1/1) (attempt #0) with attempt id 
> 496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0
> and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to flink-taskmanager:6122
> -23f057 @ flink-taskmanager.flink.svc.cluster.local (dataPort=46589) with
> allocation id 80fe79389102bd305dd87a00247413eb
> 2023-11-23 06:09:37,011 INFO
>  org.apache.kafka.common.security.authenticator.AbstractLogin [] -
> Successfully logged in.
> 2023-11-23 06:09:37,109 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration 'key.deserializer'
> was supplied but isn't a known config.
> 2023-11-23 06:09:37,109 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration 'value.deserializer'
> was supplied but isn't a known config.
> 2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration 'client.id.prefix'
> was supplied but isn't a known config.
> 2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration '
> partition.discovery.interval.ms' was supplied but isn't a known config.
> 2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration
> 'commit.offsets.on.checkpoint' was supplied but isn't a known config.
> 2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration 'enable.auto.commit'
> was supplied but isn't a known config.
> 2023-11-23 06:09:37,111 WARN  org.apache.kafka.clients.admin.
> AdminClientConfig [] - The configuration 'auto.offset.reset'
> was supplied but isn't a known config.
> 2023-11-23 06:09:37,113 INFO  org.apache.kafka.common.utils.AppInfoParser
>  [] - Kafka version: 3.2.2
> 2023-11-23 06:09:37,114 INFO  org.apache.kafka.common.utils.AppInfoParser
>  [] - Kafka commitId: 38c22ad893fb6cf5
> 2023-11-23 06:09:37,114 INFO  org.apache.kafka.common.utils.AppInfoParser
>  [] - Kafka startTimeMs: 1700719777111
> 2023-11-23 06:09:37,117 INFO
>  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
> [] - Starting the KafkaSourceEnumerator for consumer group null without
> periodic partition discovery.
> 2023-11-23 06:09:37,199 INFO  org.apache.flink.runtime.executiongraph.
> ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1
> /1) (496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from DEPLOYING to INITIALIZING.
> 2023-11-23 06:09:37,302 INFO  org.apache.flink.runtime.source.coordinator.
> SourceCoordinator [] - Source Source: src_source registering reader for
> parallel task 0 (#0) @ flink-taskmanager
> 2023-11-23 06:09:37,313 INFO  org.apache.flink.runtime.executiongraph.
> ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1
> /1) (496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from INITIALIZING to RUNNING.
> 2023-11-23 06:09:38,713 INFO
>  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
> [] - Discovered new partitions: [aiops-3, aiops-2, aiops-1, aiops-0,
> aiops-5, aiops-4]
> 2023-11-23 06:09:38,719 INFO
>  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
> [] - Assigning splits to readers {0=[[Partition: aiops-1, StartingOffset:
> -1, StoppingOffset: -9223372036854775808], [Partition: aiops-2,
> StartingOffset: -1, StoppingOffset: -9223372036854775808], [Partition:
> aiops-0, StartingOffset: -1, StoppingOffset: -9223372036854775808], [
> Partition: aiops-4, StartingOffset: -1, StoppingOffset: -
> 9223372036854775808], [Partition: aiops-3, StartingOffset: -1,
> StoppingOffset: -9223372036854775808], [Partition: aiops-5, StartingOffset:
> -1, StoppingOffset: -9223372036854775808]]}
> 2023-11-23 06:09:57,651 INFO  akka.remote.transport.ProtocolStateActor
>   [] - No response from remote for outbound association.
> Associate timed out after [2 ms].
> 2023-11-23 06:09:57,651 WARN  akka.remote.ReliableDeliverySupervisor
>   [] - Association with 

Re: The generated schema is not correct when using filesystem connector and avro format

2023-11-19 Thread Hang Ruan
Hi, julia.

I have read the code about this part. The problem as you said is that the
RowType passed to the avro-confluent format is nullable, which will cause
union with null in the schema.
I think FLINK-30438 is the same problem as yours. But I find the RowType
passed to avro-confluent format in Kafka connector is not nullable (from
`getCatalogTable().getResolvedSchema().toPhysicalRowDataType()`).

For your case, you have to modify the code in FileSystemTableSink.java#L299

to
provide a non-nullable RowType. Or you could raise a fix for the filesystem
connector.

Best,
Hang


julia bogdan  于2023年11月15日周三 01:10写道:

> Hi!
>
> I'm facing an issue with the output schema for FileSystemTableSink.
> In FileSystemTableSink#createWriter (FileSystemTableSink.java#L29
> 9)
> the original nullability of the underlying logical data type is not
> preserved, which introduces unnecessarily union with null in the schema,
> i.e. for avro, it generates [null, {"type":"record", "fields": ...}]
>  instead of {"type":"record", "fields": ...}.
> https://issues.apache.org/jira/browse/FLINK-30438 describes the same
> problem, but not sure if the root cause is the same.
> We use Flink 1.16.0, but it's relevant for newer versions.
>
>
> Looking at the source code, the issue exists because DataType::ROW
> instantiates RowType with isNullable = true by default (constructor here
> ).
> Similar DataType creation is followed by nullability check and calling
> .notNull() in DataTypeUtils
> 
> .
>
> I wonder whether someone had the same issue and whether there is a
> workaround.
>
>
> Thank you,
> Yuliya
>


Re: Handling default fields in Avro messages using Flink SQL

2023-11-13 Thread Hang Ruan
Hi, Dale.

I think there are two choices to try.
1. As the reply in #22427[1], use the SQL function `COALESCE`.
2. Modify the code in Avro format by yourself.

There is some work to do for the choice 2. First, you need to pass the
default value in Schema, which does not contain the default value now. Then
you need to modify the AvroRowDataDeserializationSchema to return the
default value when the field is null.

Best,
Hang

[1]  https://issues.apache.org/jira/browse/FLINK-22427

Dale Lane  于2023年11月14日周二 01:33写道:

> I have a Kafka topic with events produced using an Avro schema like this:
>
>
>
> {
>
> "namespace": "demo.avro",
>
> "type": "record",
>
> "name": "MySimplifiedRecreate",
>
> "fields": [
>
> {
>
> "name": "favouritePhrase",
>
> "type": "string",
>
> "default": "Hello World"
>
> },
>
> {
>
> "name": "favouriteNumber",
>
> "type": "int",
>
> "default": 42
>
> },
>
> {
>
> "name": "isItTrue",
>
> "type": "boolean"
>
> }
>
> ]
>
> }
>
>
>
> I want to use the default values in the same way that I do in other Kafka
> consumers. (Specifically, that when a message on the topic is missing a
> value for one of these properties, the default value is used instead).
>
>
>
> e.g.
>
>
>
> CREATE TABLE `simplified-recreate`
>
> (
>
> `favouritePhrase`   STRING DEFAULT 'Hello World',
>
> `favouriteNumber`   INT DEFAULT 42,
>
> `isItTrue`  BOOLEAN NOT NULL
>
> )
>
> WITH (
>
> 'connector' = 'kafka',
>
> 'format' = 'avro',
>
> ...
>
>
>
> As far as I can see, DEFAULT isn’t available in Flink SQL. (Although I can
> see it was considered before in a different context -
> https://issues.apache.org/jira/browse/FLINK-22427 )
>
>
>
> Is there another way to *process events with missing properties where the
> schema identifies the correct default*?
>
>
>
> Kind regards
>
>
>
> Dale
>
>
>
>
>
>
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: Kafka Topic Permissions Failure

2023-11-13 Thread Hang Ruan
Hi, Razin.

It seems like the issue you shared is a different problem from yours. They
have different error messages.
Have you ever tried to consume this topic using the Kafka java client[1] by
yourself to make sure you could access the topic normally?

Best,
Hang

[1] https://developer.confluent.io/get-started/java/#build-consumer

Razin Bouzar via user  于2023年11月11日周六 04:33写道:

> Hello,
>
> We seem to be encountering a bug wherein we see the error
> TopicAuthorizationException: Not authorized to access topics: [topic] for
> a topic wave R/W/D access to the topic and are using the correct principal
> (MTLS auth). The only somewhat related FLINK bug I found was FLINK-27041
> . Some partitions are
> empty, but we are running a newer version of Flink (1.16.1).
>
> Are there any suggestions on what else to check?
>
>
> *Full stack trace:*
>
> java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at org.apache.flink.connector.base.source.reader.fetcher.
> SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .getNextFetch(SourceReaderBase.java:169)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .pollNext(SourceReaderBase.java:130)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
> StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:542)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:831)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:780)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
> .runOnce(SplitFetcher.java:150)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(
> SplitFetcher.java:105)
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
> .java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.TopicAuthorizationException:
>
> --
> RAZIN BOUZAR
> Monitoring Cloud | Salesforce
> Mobile: 317-502-8995
>
> 
>


Re: Flink Job Failed With Kafka Exception

2023-11-05 Thread Hang Ruan
Hi, Madan.

This error seems like that there are some problems when the consumer tries
to read the topic metadata. If you use the same source for these topics,
the kafka connector cannot skip one of them. As you say, you need to modify
the connector's default behavior.
Maybe you should read the code in KafkaSourceEnumerator to skip this error.

Best,
Hang

Junrui Lee  于2023年11月6日周一 14:30写道:

> Hi Madan,
>
> Do you mean you want to restart only the failed tasks, rather than
> restarting the entire pipeline region? As far as I know, currently Flink
> does not support task-level restart, but requires restarting the pipeline
> region.
>
> Best,
> Junrui
>
> Madan D via user  于2023年10月11日周三 12:37写道:
>
>> Hello Team,
>> We are running the Flink pipeline by consuming data from multiple topics,
>> but we recently encountered that if there's one topic having issues with
>> participation, etc., the whole Flink pipeline is failing, which is
>> affecting topics. Is there a way we can make Flink Piplein keep running
>> even after one of the topics has an issue? We tried to handle exceptions to
>> make sure the job wouldn't fail, but it didn't help out.
>>
>> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
>>
>>
>> Can you please provide any insights?
>>
>>
>> Regards,
>> Madan
>>
>


Re: [DISCUSS][FLINK-33240] Document deprecated options as well

2023-11-01 Thread Hang Ruan
Thanks for the proposal.

+1 from my side and +1 for putting them to a separate section.

Best,
Hang

Samrat Deb  于2023年11月1日周三 15:32写道:

> Thanks for the proposal ,
> +1 for adding deprecated identifier
>
> [Thought] Can we have seperate section / page for deprecated configs ? Wdut
> ?
>
>
> Bests,
> Samrat
>
>
> On Tue, 31 Oct 2023 at 3:44 PM, Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
> > Hi Zhanghao,
> >
> > Thanks for the proposition.
> > In general +1, this sounds like a good idea as long it is clear that the
> > usage of these settings is discouraged.
> > Just one minor concern - the configuration page is already very long, do
> > you have a rough estimate of how many more options would be added with
> this
> > change?
> >
> > Best,
> > Alexander Fedulov
> >
> > On Mon, 30 Oct 2023 at 18:24, Matthias Pohl  > .invalid>
> > wrote:
> >
> > > Thanks for your proposal, Zhanghao Chen. I think it adds more
> > transparency
> > > to the configuration documentation.
> > >
> > > +1 from my side on the proposal
> > >
> > > On Wed, Oct 11, 2023 at 2:09 PM Zhanghao Chen <
> zhanghao.c...@outlook.com
> > >
> > > wrote:
> > >
> > > > Hi Flink users and developers,
> > > >
> > > > Currently, Flink won't generate doc for the deprecated options. This
> > > might
> > > > confuse users when upgrading from an older version of Flink: they
> have
> > to
> > > > either carefully read the release notes or check the source code for
> > > > upgrade guidance on deprecated options.
> > > >
> > > > I propose to document deprecated options as well, with a
> "(deprecated)"
> > > > tag placed at the beginning of the option description to highlight
> the
> > > > deprecation status [1].
> > > >
> > > > Looking forward to your feedbacks on it.
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-33240
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > >
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Hang Ruan
Congratulations!

Best,
Hang

Samrat Deb  于2023年10月27日周五 11:50写道:

> Congratulations on the great release
>
> Bests,
> Samrat
>
> On Fri, 27 Oct 2023 at 7:59 AM, Yangze Guo  wrote:
>
> > Great work! Congratulations to everyone involved!
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren  wrote:
> > >
> > > Congratulations and big THANK YOU to everyone helping with this
> release!
> > >
> > > Best,
> > > Qingsheng
> > >
> > > On Fri, Oct 27, 2023 at 10:18 AM Benchao Li 
> > wrote:
> > >>
> > >> Great work, thanks everyone involved!
> > >>
> > >> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> > >> >
> > >> > Thanks for the great work!
> > >> >
> > >> > Best,
> > >> > Rui
> > >> >
> > >> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam 
> > wrote:
> > >> >
> > >> > > Finally! Thanks to all!
> > >> > >
> > >> > > Best,
> > >> > > Paul Lam
> > >> > >
> > >> > > > 2023年10月27日 03:58,Alexander Fedulov <
> alexander.fedu...@gmail.com>
> > 写道:
> > >> > > >
> > >> > > > Great work, thanks everyone!
> > >> > > >
> > >> > > > Best,
> > >> > > > Alexander
> > >> > > >
> > >> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> > martijnvis...@apache.org>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Thank you all who have contributed!
> > >> > > >>
> > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin <
> > jinfeng1...@gmail.com>
> > >> > > >>
> > >> > > >>> Thanks for the great work! Congratulations
> > >> > > >>>
> > >> > > >>>
> > >> > > >>> Best,
> > >> > > >>> Feng Jin
> > >> > > >>>
> > >> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu <
> xbjt...@gmail.com>
> > wrote:
> > >> > > >>>
> > >> > >  Congratulations, Well done!
> > >> > > 
> > >> > >  Best,
> > >> > >  Leonard
> > >> > > 
> > >> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> > lincoln.8...@gmail.com>
> > >> > >  wrote:
> > >> > > 
> > >> > > > Thanks for the great work! Congrats all!
> > >> > > >
> > >> > > > Best,
> > >> > > > Lincoln Lee
> > >> > > >
> > >> > > >
> > >> > > > Jing Ge  于2023年10月27日周五
> 00:16写道:
> > >> > > >
> > >> > > >> The Apache Flink community is very happy to announce the
> > release of
> > >> > > > Apache
> > >> > > >> Flink 1.18.0, which is the first release for the Apache
> > Flink 1.18
> > >> > > > series.
> > >> > > >>
> > >> > > >> Apache Flink® is an open-source unified stream and batch
> data
> > >> > >  processing
> > >> > > >> framework for distributed, high-performing,
> > always-available, and
> > >> > > > accurate
> > >> > > >> data applications.
> > >> > > >>
> > >> > > >> The release is available for download at:
> > >> > > >> https://flink.apache.org/downloads.html
> > >> > > >>
> > >> > > >> Please check out the release blog post for an overview of
> the
> > >> > > > improvements
> > >> > > >> for this release:
> > >> > > >>
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >> > > >>
> > >> > > >> The full release notes are available in Jira:
> > >> > > >>
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >> > > >>
> > >> > > >> We would like to thank all contributors of the Apache Flink
> > >> > > >> community
> > >> > >  who
> > >> > > >> made this release possible!
> > >> > > >>
> > >> > > >> Best regards,
> > >> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> > >
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Hang Ruan
Congratulations!

Best,
Hang

Samrat Deb  于2023年10月27日周五 11:50写道:

> Congratulations on the great release
>
> Bests,
> Samrat
>
> On Fri, 27 Oct 2023 at 7:59 AM, Yangze Guo  wrote:
>
> > Great work! Congratulations to everyone involved!
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren  wrote:
> > >
> > > Congratulations and big THANK YOU to everyone helping with this
> release!
> > >
> > > Best,
> > > Qingsheng
> > >
> > > On Fri, Oct 27, 2023 at 10:18 AM Benchao Li 
> > wrote:
> > >>
> > >> Great work, thanks everyone involved!
> > >>
> > >> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> > >> >
> > >> > Thanks for the great work!
> > >> >
> > >> > Best,
> > >> > Rui
> > >> >
> > >> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam 
> > wrote:
> > >> >
> > >> > > Finally! Thanks to all!
> > >> > >
> > >> > > Best,
> > >> > > Paul Lam
> > >> > >
> > >> > > > 2023年10月27日 03:58,Alexander Fedulov <
> alexander.fedu...@gmail.com>
> > 写道:
> > >> > > >
> > >> > > > Great work, thanks everyone!
> > >> > > >
> > >> > > > Best,
> > >> > > > Alexander
> > >> > > >
> > >> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> > martijnvis...@apache.org>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Thank you all who have contributed!
> > >> > > >>
> > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin <
> > jinfeng1...@gmail.com>
> > >> > > >>
> > >> > > >>> Thanks for the great work! Congratulations
> > >> > > >>>
> > >> > > >>>
> > >> > > >>> Best,
> > >> > > >>> Feng Jin
> > >> > > >>>
> > >> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu <
> xbjt...@gmail.com>
> > wrote:
> > >> > > >>>
> > >> > >  Congratulations, Well done!
> > >> > > 
> > >> > >  Best,
> > >> > >  Leonard
> > >> > > 
> > >> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> > lincoln.8...@gmail.com>
> > >> > >  wrote:
> > >> > > 
> > >> > > > Thanks for the great work! Congrats all!
> > >> > > >
> > >> > > > Best,
> > >> > > > Lincoln Lee
> > >> > > >
> > >> > > >
> > >> > > > Jing Ge  于2023年10月27日周五
> 00:16写道:
> > >> > > >
> > >> > > >> The Apache Flink community is very happy to announce the
> > release of
> > >> > > > Apache
> > >> > > >> Flink 1.18.0, which is the first release for the Apache
> > Flink 1.18
> > >> > > > series.
> > >> > > >>
> > >> > > >> Apache Flink® is an open-source unified stream and batch
> data
> > >> > >  processing
> > >> > > >> framework for distributed, high-performing,
> > always-available, and
> > >> > > > accurate
> > >> > > >> data applications.
> > >> > > >>
> > >> > > >> The release is available for download at:
> > >> > > >> https://flink.apache.org/downloads.html
> > >> > > >>
> > >> > > >> Please check out the release blog post for an overview of
> the
> > >> > > > improvements
> > >> > > >> for this release:
> > >> > > >>
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >> > > >>
> > >> > > >> The full release notes are available in Jira:
> > >> > > >>
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >> > > >>
> > >> > > >> We would like to thank all contributors of the Apache Flink
> > >> > > >> community
> > >> > >  who
> > >> > > >> made this release possible!
> > >> > > >>
> > >> > > >> Best regards,
> > >> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > >> > > >>
> > >> > > >
> > >> > > 
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> > >
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> >
>


Re: Unsubscribe from user list.

2023-10-24 Thread Hang Ruan
Hi,
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists


bharghavi vajrala  于2023年10月21日周六 19:03写道:

> Team,
>
> Please unsubscribe my email id.
>
> On Thu, Oct 19, 2023 at 6:25 AM jihe18717838093 <18717838...@126.com>
> wrote:
>
>> Hi team,
>>
>>
>>
>> Could you please remove this email from the subscription list?
>>
>>
>>
>> Thank you!
>>
>>
>>
>> Best,
>>
>> Minglei
>>
>


Re: Is there any example that use hbase connector in stream API

2023-09-25 Thread Hang Ruan
Hi,

There is not a HBase connector  for the DataStream API. But we could follow
codes in the Table API to learn how to use it.

1. We could know how to build a HBaseTableSchema in
HBase2DynamicTableFactory[1].
2. We could know how to build a RowDataToMutationConverter in
HBaseDynamicTableSink[2]. If you need a different converter, you have to
implement your own converter class.

Best,
Hang

[1]
https://github.com/apache/flink-connector-hbase/blob/main/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
[2]
https://github.com/apache/flink-connector-hbase/blob/main/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java

碳酸君  于2023年9月22日周五 13:06写道:

> hi community:
>
>  I'm trying to write some data to hbase in a stream job ,with
> flink-connector-hbase-2.2 .
>  I have no idea about instantiate
> org.apache.flink.connector.hbase.util.HBaseTableSchema and
> org.apache.flink.connector.hbase.sink.HBaseMutationConverter .
>
>
> Regards,
> Teii
>


Re: 退订

2023-09-11 Thread Hang Ruan
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
user-zh@flink.apache.org  邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。
Please send email to user-zh-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user-zh@flink.apache.org ,
and you can refer [1][2] for more details.

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

刘海  于2023年9月11日周一 10:23写道:

> 退订
>
>
>


Re: 退订

2023-09-11 Thread Hang Ruan
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
user-zh@flink.apache.org  邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。
Please send email to user-zh-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user-zh@flink.apache.org ,
and you can refer [1][2] for more details.

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists


Hunk <24248...@163.com> 于2023年9月11日周一 16:45写道:

> 退订
>
> 发自我的 iPhone


Re: Incompatible KafkaProducer version

2023-09-11 Thread Hang Ruan
Hi, Krzysztof.

I find that this part has been changed in PR[1] when updating the kafka
client version to 3.4.0.
This fix is not released yet. Maybe you can package and check it by
yourself.

Best,
Hang

[1] https://github.com/apache/flink-connector-kafka/pull/11

Krzysztof Jankiewicz  于2023年9月10日周日 21:52写道:

> Hi,
>
> I am currently working on a simple application that requires exactly-once
> end-to-end guarantee.
>
> I am reading data from Kafka and writing it back to Kafka.
>
> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
> everything works fine.
> Here's the relevant code:
>
> KafkaSink sink = KafkaSink.builder()
> . . .
> .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> . . .
> .build();
>
> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I
> encounter the following error during error handling (High Availability mode
> in k8s)::
>
> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>   at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> . . .
> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>
> The code causing this issue is as follows
> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>
> Object transactionManager = this.getTransactionManager();
> synchronized(transactionManager) {
> Object topicPartitionBookkeeper = getField(transactionManager,
> "topicPartitionBookkeeper");
> transitionTransactionManagerStateTo(transactionManager,
> "INITIALIZING");
> invoke(topicPartitionBookkeeper, "reset");
> setField(transactionManager, "producerIdAndEpoch",
> createProducerIdAndEpoch(producerId, epoch));
>
> I am using Apache Kafka 1.17.1 and Apache Kafka Client
> (org.apache.kafka:kafka-clients) 3.5.1.
> I have examined the code of
> org.apache.kafka.clients.producer.internals.TransactionManager, which is
> used by org.apache.kafka.clients.producer.KafkaProducer.
> I can see the producerIdAndEpoch field, but there is no
> topicPartitionBookkeeper field.
>
> Could you please advise which version of KafkaProducer is compatible with
> the flink-connector-kafka? And am I missing something in my configuration?
>
> Kind regards
> Krzysztof
>


Re: How to read flinkSQL job state

2023-09-06 Thread Hang Ruan
Hi, Yifan.

I think the document[1] means to let us convert the DataStream to the
Table[2]. Then we could handle the state with the Table API & SQL.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

Yifan He via user  于2023年9月6日周三 13:10写道:

> Hi team,
>
> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> growing and we want to look into the checkpoint file to know what is
> causing the problem. I know we can use the state processor api to read the
> state of jobs using datastream api, but how can I read the state of jobs
> using table api & sql?
>
> Thanks,
> Yifan
>


Re: How to trigger process function when no event in eventTimeWindow ?

2023-08-27 Thread Hang Ruan
Hi, longfent.

We could use `withIdleness`[1] to deal with the idle sources.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

longfeng Xu  于2023年8月27日周日 14:01写道:

> Hello,
>
> The issue I’m encountering revolves about
> 1、 aggregating products sales for each minute. Sale data from Kafka with
> eventtime.
> 2、If there is no data in that minute, program should produces default zero.
> 3、all time I mention are eventtime. No using process time is consider of
> rerun situation.
> 4、when using flink eventTimeWindow function, process function can not be
> trigger if no event input at that window.
>
> For example, a product named A which have two data input with  time
> 9:25:12 and 9:28:23 ,how can I output zero between 9:25 and 9:28 with
> EventTime window?
>
> Best regards,
> Xlf
>


Re: How to use pipeline.jobvertex-parallelism-overrides property.

2023-08-25 Thread Hang Ruan
Hi, Krzysztof.

As liu ron said, the key of the map for this configuration is the value
from JobVertexId#toHexString. Maybe we could improve the docs to provide
more details.
The condition that two operators have the same parallelism is a must for
chaining them. If they have different parallelisms, we cannot chain them
together.

Best,
Hang

liu ron  于2023年8月25日周五 09:34写道:

> Hi, Krzysztof
>
> As stated in the description section, this option is used to override the
> parallelism of a JobVertex, where the key is JobVertex id, you can see [1]
> for double check. A JobVertex may contain more than one operator, so we
> cannot override the parallelism of a given operator alone. One possible
> solution to your problem is to leave Map1 and Map2 unchained and put them
> into two Vertexes so that they can override their parallelism separately.
>
> [1]
> https://github.com/apache/flink/blob/b3fb1421fe86129a4e0b10bf3a46704b7132e775/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1591
>
> Best,
> Ron
>
> Krzysztof Chmielewski  于2023年8月24日周四
> 20:08写道:
>
>> Hi,
>> have someone used pipeline.jobvertex-parallelism-overrides [1] property?
>>
>> I wonder what actually should be a key here? Operator name?
>>
>> What if my operators are chained and I want to override only one of its
>> elements. For example Source -> (Map1 chained with Map2) -> Sink. Can I
>> override Map2 only, keeping Map1 as is? If not, what should be used as key
>> for this chained Map1/Map2 operator then?
>>
>> Thanks.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-jobvertex-parallelism-overrides
>>
>


Re: Re-start strategy without checkpointing enabled

2023-08-23 Thread Hang Ruan
Hi, Kamal.

If we don't enable checkpointing, the job will be started with the startup
mode each time.
For example, the job reads Kafka from the earliest offset and writes to
mysql. If the job failover without checkpointing, the tasks will consume
Kafka from the earliest offset again.

I think it is best to enable checkpointing to restart job from the position
where the job stopped reading.

Best,
Hang

Kamal Mittal via user  于2023年8月23日周三 18:46写道:

> Hello,
>
>
>
> If checkpointing is NOT enabled and re-start strategy is configured then
> flink retries the whole job execution i.e. enabling checkpointing is must
> for re-try or not?
>
>
>
> Rgds,
>
> Kamal
>


Re: [Question] Good way to monitor data skewness

2023-08-16 Thread Hang Ruan
Hi, Dennis.

As Ron said, we could judge this situation by the metrics.
We are usually reporting the metrics to the external system like Prometheus
by the metric reporter[1]. And these metrics could be shown by some other
tools like grafana[2].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/#prometheus
[2] https://grafana.com/oss/

liu ron  于2023年8月16日周三 14:39写道:

> Hi, Dennis,
>
> Although all operators are chained together, each operator metrics is
> there, you can view the metrcis related to the corresponding operator's
> input and output records through the UI, as following:
> [image: image.png]
>
>
> Best,
> Ron
>
> Dennis Jung  于2023年8月16日周三 14:13写道:
>
>> Hello people,
>> I'm trying to monitor data skewness with 'web-ui', between TaskManagers.
>>
>> Currently all operators has been chained, so I cannot find how data has
>> been skewed to TaskManagers (or subtasks). But if I disable chaining,
>> AFAIK, it can degrade performance.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
>>
>> Could someone give a good suggestion on how to monitor whether data skew
>> happens or not, in Flink app?
>>
>> Thank you.
>>
>>
>>


Re: Conversion expects insert-only records but DataStream API record contains: UPDATE_BEFORE

2023-08-14 Thread Hang Ruan
Hi,

Changelog mode is the concept of the table API. You can get a changelog
stream from StreamTableEnvironment#fromChangelogStream.
For kafka connector, its changelog mode depends on the used format.

Best,
Hang

liu ron  于2023年8月13日周日 22:06写道:

> Hi,
>
> After deep dive into the source code, I guess you use the
> StreamTableEnvironment#fromDataStream method, this method only supports the
> insert-only message. According to your case, I think you should use the
> StreamTableEnvironment#fromChangelogStream[1], it supports consuming update
> row.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317
>
> Best,
> Ron
>
> 完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道:
>
>> Flink:1.15.2
>>
>> I am now going to change the data stream from *DataStream* to
>> *DataStream*
>>
>> Already implemented (*insert only works fine*), but when
>> DataStream contains *update *information
>>
>> The error is:
>> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
>> input conversion. Conversion expects insert-only records but DataStream API
>> record contains: UPDATE_BEFORE*
>> at
>> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)*
>> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> *kafkaflink.java:179-180 lines of code*
>>
>> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE,
>> beforeObject, rowTypeInfo);
>> collector. collect(before);
>>
>> The before data output is -U[1, test, 123-456-789]
>>
>> I would like to know : How to convert the stream containing *update* data
>> from *DataStream* to *DataStream*
>>
>


Re: Global/Shared objects

2023-08-10 Thread Hang Ruan
Hi, Kamal.

Each TaskManager is a JVM process and each task slot is a thread of the
TaskManager. More information see [1].
The static fields could be shared among subtasks in the same TaskManager.
If the subtasks are running in the different TaskManager, they cannot share
the static fields.

Best,
hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/flink-architecture/#task-slots-and-resources

Kamal Mittal via user  于2023年8月11日周五 12:53写道:

> Hello,
>
>
>
> Is it possible to create global/shared objects like static which are
> shared among slots in a task manager?
>
>
>
> Is it ok to create such objects in flink?
>
>
>
> Rgds,
>
> Kamal
>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
ps: Forget the link: Hybrid Source[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/

Hang Ruan  于2023年8月11日周五 10:14写道:

> Hi, Muazim.
>
> I think the Hybird Source[1] may be helpful for your case.
>
> Best,
> Hang
>
> Ken Krugler  于2023年8月11日周五 04:18写道:
>
>> As (almost) always, the devil is in the details.
>>
>> You haven’t said, but I’m assuming you’re writing out multiple files,
>> each with a different schema, as otherwise you could just leverage the
>> existing Flink support for CSV.
>>
>> So then you could combine the header/footer streams (adding a flag for
>> header vs. footer), and connect that to the row data stream, then use a
>> KeyedCoProcessFunction (I’m assuming you can key by something that
>> identifies which schema). You’d buffer the row data & footer (separately in
>> state). You would also need to set up a timer to fire at the max watermark,
>> to flush out pending rows/footer when all of the input data has been
>> processed.
>>
>> After that function you could configure the sink to bucket by the target
>> schema.
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>>
>> Thanks for the response!
>> I have a specific use case where I am writing to a TextFile sink. I have
>> a Bounded stream of header data and need  to merge it with another bounded
>> stream. While writing the data to a text file the header data should be
>> written before the original data(from another bounded stream). And also at
>> last I have another stream of footers where I would repeat the same process.
>> I tried keeping an identifier for all three streams and based on these
>> identifiers I added the data in three different ListState
>> using KeyedProcess functions. So for headers I directly emitted the value
>> but for main data and footers if I store it in a state . The issue is
>> Outside KeyedProcess I am not able to emit the data in order.
>> Is there any way I can achieve the use case of orderding the dataStreams
>> . I also tried with union but it seems it adds data arbitrarily in both
>> streams .
>> Thanks and regards
>>
>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
>> wrote:
>>
>>> Hi Muazim,
>>>
>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>
>>> So in your example below, this means stream 2 would NEVER be emitted,
>>> because stream 1 would never end (there is no time at which you know for
>>> sure that stream 1 is done).
>>>
>>> And this in turn means stream 2 would be buffered forever in state, thus
>>> growing unbounded.
>>>
>>> I would suggest elaborating on your use case.
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>>
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in
>>> stateful manner .
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am
>>> not able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>>
>>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
Hi, Muazim.

I think the Hybird Source[1] may be helpful for your case.

Best,
Hang

Ken Krugler  于2023年8月11日周五 04:18写道:

> As (almost) always, the devil is in the details.
>
> You haven’t said, but I’m assuming you’re writing out multiple files, each
> with a different schema, as otherwise you could just leverage the existing
> Flink support for CSV.
>
> So then you could combine the header/footer streams (adding a flag for
> header vs. footer), and connect that to the row data stream, then use a
> KeyedCoProcessFunction (I’m assuming you can key by something that
> identifies which schema). You’d buffer the row data & footer (separately in
> state). You would also need to set up a timer to fire at the max watermark,
> to flush out pending rows/footer when all of the input data has been
> processed.
>
> After that function you could configure the sink to bucket by the target
> schema.
>
> — Ken
>
>
> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a
> Bounded stream of header data and need  to merge it with another bounded
> stream. While writing the data to a text file the header data should be
> written before the original data(from another bounded stream). And also at
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these
> identifiers I added the data in three different ListState
> using KeyedProcess functions. So for headers I directly emitted the value
> but for main data and footers if I store it in a state . The issue is
> Outside KeyedProcess I am not able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams .
> I also tried with union but it seems it adds data arbitrarily in both
> streams .
> Thanks and regards
>
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
> wrote:
>
>> Hi Muazim,
>>
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>
>> So in your example below, this means stream 2 would NEVER be emitted,
>> because stream 1 would never end (there is no time at which you know for
>> sure that stream 1 is done).
>>
>> And this in turn means stream 2 would be buffered forever in state, thus
>> growing unbounded.
>>
>> I would suggest elaborating on your use case.
>>
>> Regards,
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>
>> Hi Team,
>> I have a use case where I have two streams and want to join them in
>> stateful manner .
>> E.g data of stream 1 should be emitted before stream2.
>> I tried to store the data in ListState in KeyedProcessFunction but I am
>> not able to access state  outside proccessElement().
>> Is there any way I could achieve this?
>> Thanks and regards
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: Custom connector Update flink 1.13.3 failed notsuchMethod

2023-08-01 Thread Hang Ruan
Hi, longfeng.

I think you should rebuild your connector according to the new API. The
return type of the method `DynamicTableFactory$Context.getCatalogTable()`
is already changed from `CatalogTable` to `ResolvedCatalogTable`[].

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-21913

longfeng Xu  于2023年8月2日周三 08:26写道:

> Flink 1.13.3
>
> Custom connector  Using flink Kafka connector code and little refactoring;
>
> And Custom connector can be load in flink 1.12 when using
> StreamTableEnvironment.
>
> Now flink upgrade to 1.13.3, custom connector dependencies also upgraded
> to 1.13.3
>
> But failed to load:
>
> java.lang.NoSuchMethodError: 'org.apache.flink.table.catalog.CatalogTable
> org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()
>
> Tried many ways but failed( shaded package to fat jar  like demo in
> website. )
>
> Thanks for your help
>


Re: Kafka Exception

2023-07-31 Thread Hang Ruan
Hi, Kenan.

Maybe you should set the `client.id.prefix` to avoid the conflict.

Best,
Hang

liu ron  于2023年7月31日周一 19:36写道:

> Hi, Kenan
>
> After studying the source code and searching google for related
> information, I think this should be caused by duplicate client_id [1], you
> can check if there are other jobs using the same group_id in consuming this
> topic. group_id is used in Flink to assemble client_id [2], if there are
> already jobs using the same group _id, the duplicated client_id will be
> detected on the Kafka side.
>
> [1]
> https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer
> [2]
> https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466
>
> Best,
> Ron
>
> Kenan Kılıçtepe  于2023年7月25日周二 21:48写道:
>
>>
>>
>>
>>
>> Any help is appreciated about the exception below.
>> Also my Kafkasource code is below. The parallelism is 16 for this task.
>>
>> KafkaSource sourceStationsPeriodic = KafkaSource.
>> builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>
>> .build();
>> // Our Kafka Source
>> KafkaSource sourceStationsWifiInterface =
>> KafkaSource.builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI_INTERFACE))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>> KafkaSource sourceTwinMessage = KafkaSource.<
>> String>builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_TWIN_MESSAGE))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>> KafkaSource sourceStationsOnDemand = KafkaSource.
>> builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>> KafkaSource sourceDeviceInfo = KafkaSource.<
>> String>builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_DEVICE_INFO))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>>
>>
>> 2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
>>  [] - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
>> at
>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>> ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>> ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>> ~[?:?]
>> at
>> 

Re: Flink netty connector for TCP source

2023-07-31 Thread Hang Ruan
Hi, Kamal.

The SplitEnumerator is contained in the SourceCoordinator. They are only
used in JM.

Best,
Hang

Kamal Mittal via user  于2023年8月1日周二 10:43写道:

> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main
> repo. If you need this connector, you can try to implement it refer to the
> FLIP-27 source docs
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>
>
>
> Best,
>
> Ron
>
>
>
> Shammon FY  于2023年7月27日周四 11:23写道:
>
> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
> wrote:
>
> Hello Shammon,
>
>
>
> Yes socket text stream I am aware of but was thinking if something like as
> ‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty
> ’
> is also supported by Flink?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 27 July 2023 08:15 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> There's socket text stream in `DataStream` and you can refer to [1] for
> more details.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: Logs of Kafka payload in Table Api connector of Apache Flink

2023-07-20 Thread Hang Ruan
Hi. elaloya.

If you want to log some information about the kafka records, you can add
some logs in KafkaRecordEmitter.
If you want to know the information about the deserialized value, you
should add logs in the avro format.

Best,
Hang

elakiya udhayanan  于2023年7月19日周三 19:44写道:

> Hi Team,
>
> I am using the upsert-kafka table API connector of Apache Flink to consume
> events from a kafka topic, I want to log the kafka payloads that are
> consumed. Is there a way to log it?
>
> My code looks as below:
>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> String statement = "CREATE TABLE Employee (\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (employeeId) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
> ")";
> tEnv.executeSql(statement);
>
> I added log4j.properties to enable log but it did not work. Any help is
> appreciated.
>


Re: Query on Flink SQL primary key for nested field

2023-07-10 Thread Hang Ruan
Hi, Elakiya.

If everything is right for the KafkaTable, I think there must be a
`user_id` field in the Kafka message key.
We could see the code in the method `createKeyValueProjections` of
`UpsertKafkaDynamicTableFactory` as follows.

```
private Tuple2
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
ResolvedSchema schema = catalogTable.getResolvedSchema();
// primary key should validated earlier
List keyFields = schema.getPrimaryKey().get().getColumns();
DataType physicalDataType = schema.toPhysicalRowDataType();

Configuration tableOptions =
Configuration.fromMap(catalogTable.getOptions());
// upsert-kafka will set key.fields to primary key fields by default
tableOptions.set(KEY_FIELDS, keyFields);

int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
int[] valueProjection = createValueFormatProjection(tableOptions,
physicalDataType);

return Tuple2.of(keyProjection, valueProjection);
}
```
The primary keys will be put in the KEY_FIELDS option to create the key
format projection, which will be used to get fields from Kafka message key.

Best,
Hang

elakiya udhayanan  于2023年7月10日周一 16:41写道:

> Hi Hang,
>
>  The select query works fine absolutely, we have also implemented join
> queries which also works without any issues.
>
> Thanks,
> Elakiya
>
> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan  wrote:
>
>> Hi, Elakiya.
>>
>> Maybe this DDL could be executed. Please execute the select sql `select *
>> from KafkaTable`. Then I think there will be some error or the `user_id`
>> will not be read correctly.
>>
>> Best,
>> Hang
>>
>> elakiya udhayanan  于2023年7月10日周一 16:25写道:
>>
>>> Hi Hang Ruan,
>>>
>>> Thanks for your response. But in the documentation, they have an example
>>> of defining the Primary Key for the DDL statement (code below). In that
>>> case we should be able to define the primary key for the DDL rite. We have
>>> defined the primary key in our earlier use cases when it wasn't a nested
>>> field. Please correct me , If I have misunderstood anything.
>>>
>>> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
>>> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
>>> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
>>> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
>>> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
>>> 'value.fields-include' = 'EXCEPT_KEY')
>>>
>>> Thanks,
>>> Elakiya
>>>
>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan 
>>> wrote:
>>>
>>>> Hi, elakiya.
>>>>
>>>> The upsert-kafka connector will read the primary keys from the Kafka
>>>> message keys. We cannot define the fields in the Kafka message values as
>>>> the primary key.
>>>>
>>>> Best,
>>>> Hang
>>>>
>>>> elakiya udhayanan  于2023年7月10日周一 13:49写道:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>>> and will emit the payload as below:
>>>>>
>>>>> {
>>>>> "employee": {
>>>>> "id": "123456",
>>>>> "name": "sampleName"
>>>>> }
>>>>> }
>>>>> I am using the upsert-kafka connector to consume the events from the
>>>>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>>>>> want to use the id field as the Primary key. But I am unable to use the id
>>>>> field since it is inside the object.
>>>>>
>>>>> DDL Statement:
>>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>>> "  ),\r\n" +
>>>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>>> ") WITH (\r\n" +
>>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>>> "  'topic' = 'employee',\r\n" +
>>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>>> "  'key.format' = 'raw',\r\n" +
>>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>>> "  'value.avro-confluent.url' = 
>>>>> 'http://kafka-cp-schema-registry:8081',\r\n"
>>>>> +
>>>>> ")";
>>>>> Any help is appreciated TIA
>>>>>
>>>>> Thanks,
>>>>> Elakiya
>>>>>
>>>>


Re: Query on Flink SQL primary key for nested field

2023-07-10 Thread Hang Ruan
Hi, Elakiya.

Maybe this DDL could be executed. Please execute the select sql `select *
from KafkaTable`. Then I think there will be some error or the `user_id`
will not be read correctly.

Best,
Hang

elakiya udhayanan  于2023年7月10日周一 16:25写道:

> Hi Hang Ruan,
>
> Thanks for your response. But in the documentation, they have an example
> of defining the Primary Key for the DDL statement (code below). In that
> case we should be able to define the primary key for the DDL rite. We have
> defined the primary key in our earlier use cases when it wasn't a nested
> field. Please correct me , If I have misunderstood anything.
>
> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
> 'value.fields-include' = 'EXCEPT_KEY')
>
> Thanks,
> Elakiya
>
> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan  wrote:
>
>> Hi, elakiya.
>>
>> The upsert-kafka connector will read the primary keys from the Kafka
>> message keys. We cannot define the fields in the Kafka message values as
>> the primary key.
>>
>> Best,
>> Hang
>>
>> elakiya udhayanan  于2023年7月10日周一 13:49写道:
>>
>>> Hi team,
>>>
>>> I have a Kafka topic named employee which uses confluent avro schema and
>>> will emit the payload as below:
>>>
>>> {
>>> "employee": {
>>> "id": "123456",
>>> "name": "sampleName"
>>> }
>>> }
>>> I am using the upsert-kafka connector to consume the events from the
>>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>>> want to use the id field as the Primary key. But I am unable to use the id
>>> field since it is inside the object.
>>>
>>> DDL Statement:
>>> String statement = "CREATE TABLE Employee (\r\n" +
>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>> "  ),\r\n" +
>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>> ") WITH (\r\n" +
>>> "  'connector' = 'upsert-kafka',\r\n" +
>>> "  'topic' = 'employee',\r\n" +
>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>> "  'key.format' = 'raw',\r\n" +
>>> "  'value.format' = 'avro-confluent',\r\n" +
>>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>>> +
>>> ")";
>>> Any help is appreciated TIA
>>>
>>> Thanks,
>>> Elakiya
>>>
>>


Re: Query on Flink SQL primary key for nested field

2023-07-10 Thread Hang Ruan
Hi, elakiya.

The upsert-kafka connector will read the primary keys from the Kafka
message keys. We cannot define the fields in the Kafka message values as
the primary key.

Best,
Hang

elakiya udhayanan  于2023年7月10日周一 13:49写道:

> Hi team,
>
> I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:
>
> {
> "employee": {
> "id": "123456",
> "name": "sampleName"
> }
> }
> I am using the upsert-kafka connector to consume the events from the above
> Kafka topic as below using the Flink SQL DDL statement, also here I want to
> use the id field as the Primary key. But I am unable to use the id field
> since it is inside the object.
>
> DDL Statement:
> String statement = "CREATE TABLE Employee (\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
> Any help is appreciated TIA
>
> Thanks,
> Elakiya
>


Re: Recursive Split Detection + same split optimization

2023-07-10 Thread Hang Ruan
Hi, Benoit.

A split enumerator responsible for discovering the source splits, and
assigning them to the reader. It seems like that your connector discovering
splits in TM and assigning them in JM.

I think there are 2 choices:
1. If you need the enumerator to assign splits, you have to send the events
about the splits between the source reader and the enumerator.
2. If you can make use of the subtaskId and let every reader read some
scope of the IDs, the enumerator is useless for you.

I am not sure whether you are able to move the discovering splits task back
to the enumerator by multi thread. Putting it to the TM may be weird and
error-prone.

Finally, I have a second problem which is about avoiding extracting
> multiple times the same split. We can imagine, based on my previous
> explanation, that same ID might be detected through multiple parent splits.
> To avoid losing time doing the same job multiple times, I need to avoid
> extracting the same ID.
> Actually, I am thinking about storing the already extracted ID into the
> state and storing it into my state backend. What do you think about this ?
>

For choice 1, put the information in the enumerator's state.
For choice 2, no need to consider that issue.

Best,
Hang

Benoit Tailhades  于2023年7月10日周一 12:59写道:

> Hello Everyone,
>
> I am trying to implement a custom source where split detection is an
> expensive operation and I would like to benefit from the split reader
> results to build my next splits.
>
> Basically, my source is taking as input an id from my external system,
> let's name it ID1.
>
> From ID1, I can get a list of other sub splits but getting this list is an
> expensive operation so I want it to be done on a task manager during the
> split reading of ID1. Now we can imagine sub splits of ID1 are ID1.1 and
> ID1.2.
> So, to sum up my split reader of ID1 will be responsible for:
> 1. Collecting content of ID1
> 2. Producing n sub splits
> Then, the split enumerator will receive these sub splits and schedule
> ID1.1, ... ID1.n for split reading.
>
> As of now, I have implemented this mechanism using events between split
> reader and split enumerator but I think there might be a better
> architecture using Flink.
>
> Finally, I have a second problem which is about avoiding extracting
> multiple times the same split. We can imagine, based on my previous
> explanation, that same ID might be detected through multiple parent splits.
> To avoid losing time doing the same job multiple times, I need to avoid
> extracting the same ID.
> Actually, I am thinking about storing the already extracted ID into the
> state and storing it into my state backend. What do you think about this ?
>
> Thank you.
>
> Benoit
>
>


Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-06 Thread Hang Ruan
Hi, Leonard.

I would like to help to add this page. Please assign this issue to me.
Thanks.

Best,
Hang

Leonard Xu  于2023年7月7日周五 11:26写道:

> Congrats to all !
>
> It will be helpful to promote Apache Flink if we can add a page to our
> website like others[2]. I’ve created an issue to improve this.
>
>
> Best,
> Leonard
>
> [1] https://issues.apache.org/jira/browse/FLINK-32555
> [2] https://spark.apache.org/news/sigmod-system-award.html
>


Re: Difference between different values for starting offset

2023-07-04 Thread Hang Ruan
Hi, Oscar.

Yes, your are right.

If starting from a checkpoint or savepoint, kafka connector will always use
the offset stored in the states.
If starting without a checkpoint or savepoint, kafka connector will use the
specific startup mode.

Best,
Hang

Oscar Perez via user  于2023年7月4日周二 20:54写道:

> Hei,
> Ok, thanks. so if I understand this correctly the difference between
> OffsetInitializer.earliest and commitedOffset(OffsetResetStrategy.EARLIEST)
> will be in the case that there is no flink state. In this case, earliest
> will not check kafka committed offset and start from earliest while in the
> latter will use the committed offset from kafka if there is any, is that
> right? In either case if the committed offset is in flink state that will
> take precedence and will be used in either case right?
>
> Thanks,
> Oscar
>
> On Tue, 4 Jul 2023 at 02:56, Mason Chen  wrote:
>
>> Hi Oscar,
>>
>> You are correct about the OffsetInitializer being only effective when
>> there is no Flink state--in addition, if you have partition discovery on,
>> this initializer will be reused for the new partitions (i.e. splits)
>> discovered. Assuming the job is continuing from the offset in Flink state,
>> there is no difference between the two strategies. This is because the
>> `auto.offset.reset` maps to the `OffsetResetStrategy` and
>> OffsetInitializer.earliest uses `earliest` too.
>>
>> Best,
>> Mason
>>
>> On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user <
>> user@flink.apache.org> wrote:
>>
>>> Hei,
>>>
>>> Looking at the flink documentation for kafkasource I see the following
>>> values for starting offset:
>>>
>>> OffsetInitializer.earliest
>>> OffsetInitializer.latest
>>> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)
>>>
>>> From what I understand OffsetInitializer.earliest uses earliest offset
>>> the first time but later deployments will use the committed offset in the
>>> flink state to resume from there. If that is the case what is the
>>> difference between OffsetInitializer.earliest and
>>> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
>>> committed offset after redeployment?
>>>
>>> Thanks!
>>> Oscar
>>>
>>


Re: Part files generated in reactive mode

2023-07-04 Thread Hang Ruan
Hi, Wang Mengxi,

You should use the file compaction feature[1] to compact the small files.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#compaction

Wang, Mengxi X via user  于2023年7月4日周二 22:44写道:

> Hi,
>
>
>
> We want to process one 2GB file and the output should also be a single 2GB
> file, but after we enabled reactive mode it generated several hundred small
> output files instead of one 2GB file. Can anybody help please?
>
>
>
> Best wishes,
>
> Mengxi Wang
>
>
>
> This message is confidential and subject to terms at:
> https://www.jpmorgan.com/emaildisclaimer including on confidential,
> privileged or legal entity information, malicious content and monitoring of
> electronic messages. If you are not the intended recipient, please delete
> this message and notify the sender immediately. Any unauthorized use is
> strictly prohibited.
>


Re: Unsubscribe

2023-07-04 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

Ajinkya Pathrudkar  于2023年7月4日周二 20:22写道:

> Unsubscribe --
> Thanks & Regards,
> Ajinkya Pathrudkar
>


Re: Unsubscribe

2023-07-04 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

tan yao  于2023年7月4日周二 15:51写道:

> Unsubscribe
>
> 获取 Outlook for iOS 
>


Re: Unsubscribe

2023-07-04 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

Victor Villa Dev  于2023年7月5日周三 07:15写道:

>


Re: Unsubscribe

2023-07-04 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

liyuhang  于2023年7月5日周三 10:51写道:

> Unsubscribe
>
>


Re: Unsubscribe

2023-07-04 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

Bauddhik Anand  于2023年7月4日周二 16:02写道:

> Unsubscribe
>


Re: how to get blackhole connector jar

2023-06-28 Thread Hang Ruan
Hi, longfeng,

I check the blackhole connector document[1] and the blackhole connector is
a build-in connector.
If you can not find this connector in your flink, maybe you should check
the `flink-table-api-java-bridge` jar to find the
`BlackHoleTableSinkFactory`[2].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/blackhole.html
[2]
https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java

longfeng Xu  于2023年6月28日周三 16:09写道:

> hi guys,
>   when using alibaba flink (version flink 1.12) to running nexmark's
> query0 , it failed blackhole is not a supported sink connector.
>  so how can i upload connector-blackhole like nexmark connector? where to
> download it?
>
> thanks
>


Re: Unsubscribe

2023-06-24 Thread Hang Ruan
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.
请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

 于2023年6月23日周五 14:18写道:

>


Re: Admin Client Configs

2023-06-24 Thread Hang Ruan
Hi, Razin.

You could pass these Kafka configuration in the format 'properties.*'. For
example, 'properties.allow.auto.create.topics' = 'false'.
See more in Kafka connector docs[1].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#properties

Razin Bouzar via user  于2023年6月22日周四 02:21写道:

> Looking for some guidance on overriding Kafka admin client configs. The
> partition discovery feature sometimes fails to connect with kafka and we'd
> like to implement retries. There is no clear documentation on which
> settings need to be changed?admin client configs of note:
>
>- default.api.timeout.ms -- applies for client ops which do not
>specify a timeout param.
>- reconnect.backoff.max
>
> 
>- request.timeout.ms
>- retries
>
> 
>- retry.backoff.ms
>
>


Re: Unsubscribe

2023-06-12 Thread Hang Ruan
Please send an email to user-unsubscr...@flink.apache.org to unsubscribe

Best,
Hang

Yu voidy  于2023年6月12日周一 11:39写道:

>
>


Re: Custom Counter on Flink File Source

2023-06-07 Thread Hang Ruan
Hi, Kirti.

We could find these information in the 1.18 release wiki page[1].

Its timeline is as follows.
Feature Freeze: July 11, 2023, end of business CEST
Release: End of September 2023

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/1.18+Release#id-1.18Release-Summary

Kirti Dhar Upadhyay K  于2023年6月7日周三
15:49写道:

> Thanks Hang.
>
> Any expected date for Flink 1.18.0 release?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 07 June 2023 07:34
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is
> not contained in any release now.
>
>
>
> Best,
>
> Hang
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
>
>
>
> Kirti Dhar Upadhyay K  于2023年6月7日周三
> 02:51写道:
>
> Hi Hang,
>
>
>
> Thanks for reply.
>
> I tried using SplitEnumeratorContext passed in
> AbstractFileSource#createEnumerator but resulted as NullPointerException.
>
> As SplitEnumeratorContext provides its implementation as
> SourceCoordinatorContext having metricGroup() as below-
>
>
>
>
>
> @Override
>
> *public* SplitEnumeratorMetricGroup metricGroup() {
>
> *return* *null*;
>
> }
>
>
>
> Am I doing any mistake?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 06 June 2023 08:12
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> We could get the metric group from the context, like `SourceReaderContext`
> and `SplitEnumeratorContext`. These contexts could be found when creating
> readers and enumerators. See `AbstractFileSource#createReader` and
> `AbstractFileSource#createEnumerator`.
>
>
>
> Best,
>
> Hang
>
>
>
> Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57
> 写道:
>
> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>
>


Re: Custom Counter on Flink File Source

2023-06-06 Thread Hang Ruan
Hi, Kirti Dhar Upadhyay K.

I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is
not contained in any release now.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator

Kirti Dhar Upadhyay K  于2023年6月7日周三
02:51写道:

> Hi Hang,
>
>
>
> Thanks for reply.
>
> I tried using SplitEnumeratorContext passed in
> AbstractFileSource#createEnumerator but resulted as NullPointerException.
>
> As SplitEnumeratorContext provides its implementation as
> SourceCoordinatorContext having metricGroup() as below-
>
>
>
>
>
> @Override
>
> *public* SplitEnumeratorMetricGroup metricGroup() {
>
> *return* *null*;
>
> }
>
>
>
> Am I doing any mistake?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 06 June 2023 08:12
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> We could get the metric group from the context, like `SourceReaderContext`
> and `SplitEnumeratorContext`. These contexts could be found when creating
> readers and enumerators. See `AbstractFileSource#createReader` and
> `AbstractFileSource#createEnumerator`.
>
>
>
> Best,
>
> Hang
>
>
>
> Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57
> 写道:
>
> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>
>


Re: flink14 sql sink kafka error

2023-06-05 Thread Hang Ruan
Hi, 湘晗刚,

This error seem to be an error from the Kafka server. Maybe you should
check whether the Kafka server occurs some error.
Or you could provide more messages about the request. These information is
too short to analyze,

Best,
Hang

湘晗刚 <1016465...@qq.com> 于2023年6月5日周一 15:08写道:

> UnknownServerException :The server experienced an unexpected error when
> processing the reqiest.
> Thanks
> Kobe24
>


Re: Custom Counter on Flink File Source

2023-06-05 Thread Hang Ruan
Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext`
and `SplitEnumeratorContext`. These contexts could be found when creating
readers and enumerators. See `AbstractFileSource#createReader` and
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57写道:

> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>


Re: SupportsReadingMetadata flow between table transformations

2023-05-30 Thread Hang Ruan
Hi, Yuval.

`SupportsReadingMetadata` make connectors be able to append metadata
columns to the schema of table sources.  But one operator does not know the
columns map to which column in the source.
We could try to generate this information when parsing Flink sql. But it is
hard for DataStream API to generate this information.

Best,
Hang

Yuval Itzchakov  于2023年5月30日周二 12:27写道:

>
> Hi,
> I am looking for a way to propagate a column all the way from the source
> to the sink, without the knowledge of whoever is applying transformations
> on the tables.
>
> I looked into SupportsReadingMetadata, and saw that it is able to "tag"
> tables with metadata. I was wondering, does this tag flow between the
> different projections and joins the table goes through? If not, what could
> be alternatives to achieve flowing some column A between all table
> transformations such that I can provide some "metadata" context that is
> coming in from the source itself?
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Reading KafkaSource state from a savepoint using the State Processor API

2023-05-24 Thread Hang Ruan
Hi, Charles,

I am used to read the state in the debug mode. I always set the breakpoint
at the return statemnet in `SavepointReader#read`.
Then I could find the state I need in the field `SavepointMetadataV2
savepointMetadata`.
Finally I could deserialize the state bytes with
`KafkaPartitionSplitSerializer`.

Best,
Hang

Charles Tan  于2023年5月24日周三 06:27写道:

> Hi everyone,
>
> I have a few questions about reading KafkaSource state using the State
> Processor API. I have a simple Flink application which reads from a Kafka
> topic then produces to a different topic. After running the Flink job and
> stopping it with a savepoint, I then write a few more records to the input
> topic. When the job is resumed from this savepoint, it reads records from
> the position it left off, indicating that the job successfully used the
> savepoint to recover its position. When I inspect the savepoint file with
> the state processor API, I can read the "SourceReaderState" from the
> savepoint. However, the state is read as a Java byte array and I can't
> decode it or make any sense of it. I want to be able to read the savepoint
> state to find out exactly how much progress (partition/offset) a job has
> made in case it fails or is stopped.
>
> Does anyone have any ideas how I can deserialize the bytes from the Kafka
> source state or more generically how to read the Kafka source operator
> state from a savepoint?
>
> Here is the link to a github repository that contains the Flink job that I
> was running, a savepoint file, and the code I was using to try to read the
> savepoint. (https://github.com/charles-tan/flink-state-processor-example)
>
> Thanks,
> Charles
>


Re: flink1.16.1 jdbc-3.1.0-1.16 There is a problem trying left join

2023-05-10 Thread Hang Ruan
Hi, yangxueyong,

The filter(where condition) will be pushed down to the source if the
connector implements the interface `SupportsFilterPushDown`.
In your case, the sql planner analyzed that the records sent by
`test_flink_res1` would satisfy the conditions (`name` =
'abc0.11317691217472489') and (`id` IS NULL). These filters are pushed down
to the source.

Best,
Hang

Shammon FY  于2023年5月10日周三 14:15写道:

> Hi Yangxueyong,
>
> Are you sure this is your Flink SQL job? This SQL statement looks very
> strange, the table 'test_flink_res2' is both source and sink, and the join
> key is null.
>
> Best,
> Shammon FY
>
> On Wed, May 10, 2023 at 12:54 PM yangxueyong 
> wrote:
>
>> flink1.16.1
>>
>> mysql8.0.33
>>
>> jdbc-3.1.0-1.16
>>
>>
>> I have a sql,
>>
>> insert into test_flink_res2(id,name,address)
>> select a.id,a.name,a.address from test_flink_res1 a left join
>> test_flink_res2 b on a.id=b.id where a.name='abc0.11317691217472489' and
>> b.id is null;
>>
>> *Why does flinksql convert this statement into the following statement?*
>>
>> SELECT `address` FROM `test_flink_res1` WHERE ((`name` =
>> 'abc0.11317691217472489')) AND ((`id` IS NULL))
>>
>> *As a result, there is no data in test_flink_res2,why?*
>>
>>
>>
>>


Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-10 Thread Hang Ruan
Hi, Iris,

The Flink counter is cumulative. There are `inc` and `dec` methods in it.
As the value of the counter has been calculated in Flink, we do not need
use the counter metric in statsd to calculate.

Best,
Hang

Iris Grace Endozo  于2023年5月10日周三 14:53写道:

> Hey Hang,
>
> Thanks for the prompt response. Does this mean the Flink counters emitted
> via statsd are cumulative then? From the spec
> https://github.com/b/statsd_spec#counters
>
>
> > A counter is a gauge calculated at the server. Metrics sent by the
> client increment or decrement the value of the gauge rather than giving its
> current value.
>
> This means that counters are not monotonic and work like deltas that are
> aggregated on the server side.
>
> Cheers, Iris.
>
> --
>
> *Iris Grace Endozo,* Senior Software Engineer
> *M *+61 435 108 697
> *E* iris.end...@gmail.com
> On 10 May 2023 at 1:02 PM +1000, Hang Ruan ,
> wrote:
>
> Hi, Iris,
>
> The metrics have already be calculated  in Flink. So we only need to
> report these metric as the gauges.
> For example, the metric `metricA` is a Flink counter and is increased from
> 1 to 2. The statsd gauge will be 2 now. If we register it as a statsd
> counter, we will send 1 and 2 to the statsd counter. The statsd counter
> will be 3, which is a wrong result.
>
> Best,
> Hang
>
> Iris Grace Endozo  于2023年5月9日周二 19:19写道:
>
>> Hey folks trying to troubleshoot why counter metrics are appearing as
>> gauges on my end. Is it expected that the StatsdMetricsReporter is
>> reporting gauges for counters as well?
>>
>> Looking at this one:
>> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
>> the statsd specifications state that counters need to be reported as
>> :|c[|@] but it seems it's defaulting to
>> "%s:%s|g" in the above. Ref: https://github.com/b/statsd_spec#counters
>>
>> Wondering if anyone else has hit this issue or there's an existing issue?
>>
>> Cheers, Iris.
>>
>> --
>>
>> *Iris Grace Endozo,* Senior Software Engineer
>> *M *+61 435 108 697
>> *E* iris.end...@gmail.com
>>
>


Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hang Ruan
Hi, amenreet,

As Hangxiang said, we should use a new checkpoint dir if the new job has
the same jobId as the old one . Or else you should not use a fixed jobId
and the checkpoint dir will not conflict.

Best,
Hang

Hangxiang Yu  于2023年5月10日周三 10:35写道:

> Hi,
> I guess you used a fixed JOB_ID, and configured the same checkpoint dir as
> before ?
> And you may also start the job without before state ?
> The new job cannot know anything about before checkpoints, that's why the
> new job will fail when it tries to generate a new checkpoint.
> I'd like to suggest you to use different JOB_ID for different jobs, or set
> a different checkpoint dir for a new job.
>
> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi  wrote:
>
>> Hi all,
>>
>> Is there any way to prevent restart of flink job, or override the
>> checkpoint metadata, if for some reason there exists a checkpoint by same
>> name. I get the following exception and my job restarts, have been trying
>> to find solution for a very long time but havent found anything useful yet,
>> other than manually cleaning.
>>
>> 2023-02-27 10:00:50,360 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> [] - Failed to trigger or complete checkpoint 1 for job
>> 6e6b1332. (0 consecutive failed attempts so far)
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>> finalize checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>
>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>
>> Caused by: java.io.IOException: Target file
>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>> already exists.
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> ... 7 more
>>
>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Error while processing AcknowledgeCheckpoint message
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>> checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>
>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>
>> Caused 

Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-09 Thread Hang Ruan
Hi, Iris,

The metrics have already be calculated  in Flink. So we only need to report
these metric as the gauges.
For example, the metric `metricA` is a Flink counter and is increased from
1 to 2. The statsd gauge will be 2 now. If we register it as a statsd
counter, we will send 1 and 2 to the statsd counter. The statsd counter
will be 3, which is a wrong result.

Best,
Hang

Iris Grace Endozo  于2023年5月9日周二 19:19写道:

> Hey folks trying to troubleshoot why counter metrics are appearing as
> gauges on my end. Is it expected that the StatsdMetricsReporter is
> reporting gauges for counters as well?
>
> Looking at this one:
> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
> the statsd specifications state that counters need to be reported as
> :|c[|@] but it seems it's defaulting to
> "%s:%s|g" in the above. Ref: https://github.com/b/statsd_spec#counters
>
> Wondering if anyone else has hit this issue or there's an existing issue?
>
> Cheers, Iris.
>
> --
>
> *Iris Grace Endozo,* Senior Software Engineer
> *M *+61 435 108 697
> *E* iris.end...@gmail.com
>


Re: OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-09 Thread Hang Ruan
Hi, Pritam,

I see Martijn has responsed the ticket.

Kafka source (FLIP-27) will commit offsets in two places: kafka consumer
auto commit and invoke `consumer.commitAsync` when checkpoint is completed.
- If the checkpoint is enabled and commit.offsets.on.checkpoint = true,
kafka connector commits offsets when a checkpoint is completed.
- If properties.enable.auto.commit = true, kafka consumers auto commit
offsets periodically (the interval depends on `
properties.auto.commit.interval.ms`).

If the checkpoint is enabled and commit.offsets.on.checkpoint = true and
properties.enable.auto.commit = true, I think the offsets will be committed
in both two places.

Best,
Hang

Pritam Agarwala  于2023年5月9日周二 14:24写道:

> Hi Team,
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka
> committed offset. Flink is updating the offsets only after checkpointing to
> make it consistent.
>
> Default Behaviour as per doc :
> If checkpoint is enabled, but consumer.setCommitOffsetsOnCheckpoints set
> to false, then offset will not be committed at all even if the
> enable.auto.commit is set to true.
>
> So, when consumer.setCommitOffsetsOnCheckpoints set to false, *shouldn't
> it fall back on the enable.auto.commit to do offset commit regularly since*
>  *in any case flink doesn't use consumer committed offsets for recovery.*
>
>
> Jira Ticket : https://issues.apache.org/jira/browse/FLINK-32038
>
> Thanks & Regards,
> Pritam Agarwala
> Senior Data Engineer
>


Re: Flink Sql erroring at runtime

2023-05-08 Thread Hang Ruan
Hi, neha,

I think the error occurred because of the deserialization. Is there some
example data and runnable SQLs to reproduce the problem?

Best,
Hang

neha goyal  于2023年5月2日周二 16:33写道:

> Hello,
>
> I am using Flink 1.16.1 and observing a different behavior from Flink
> 1.13.6.
>
> SELECT if(some_string_field is null, 'default', 'some_string_field') from
> my_stream
>
> This SQL flink job in the streaming environment is erroring out during
> runtime with the exception mentioned below. There are no null values sent
> and it is failing for the nonnull values as well.
>
> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
> runs fine.
> Was there any change around this in Flink 14/15/16?
>
> io.IOException: Failed to deserialize consumer record due to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:750)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ... 22 more
> Caused by: java.lang.NullPointerException
> at
> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
> Source)
> at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
> at StreamExecCalc$53548.processElement_split10047(Unknown Source)
> at StreamExecCalc$53548.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ... 28 more
>

Re: Query on flink-parquet

2023-05-08 Thread Hang Ruan
Hi, Anuj,

Classes annotated with @Internal or @Experimental can be changed across any
two releases. Classes annotated with @PublicEvolving only can be changed
across minor releases (1.17.0 and 1.18.0).
So the classes you mentioned may be changed. If the API changed in a new
release, you have to modify your code to use the new release.

Best,
Hang

Anuj Jain  于2023年5月8日周一 13:28写道:

>
>> Hi Community,
>>
>>
>> I am trying to use flink-parquet for reading and writing parquet files
>> from the Flink filesystem connectors.
>>
>> In File source, I would be decoding parquet files and converting them to
>> avro records and similarly in file sink i would be encoding avro records to
>> parquet files.
>>
>>
>> For collection i am using
>>
>> BulkFormat bulkFormat =
>> new
>> StreamFormatAdapter<>(AvroParquetReaders.forSpecificRecord(recordClazz));
>> FileSource source = FileSource.forBulkFileFormat(bulkFormat,
>> path).build();
>>
>>
>> and for sinking i am using
>>
>> FileSink sink = FileSink.forBulkFormat(path,
>> AvroParquetWriters.forSpecificRecord(recordClazz)).build()
>>
>>
>> Query: The StreamFormatAdapter class is marked @Internal and, 
>> AvroParquetReaders
>> and AvroParquetWriters classes are marked @Experimental – does it mean
>> that in future flink releases these classes can be changed in a
>> non-backward compatible way like plugging of any other 3PP rather than
>> “parquet-avro” or changing the API structure thus impacting the application
>> code ?
>>
>> Would it be safe to use the code as specified above ?
>>
>>
>> Thanks and Regards
>>
>> Anuj
>>
>


Re: Can flink1.15.2 use flink sql to create a broadcast table? Ididn't find anything related in https://flink.apache.org/

2023-05-06 Thread Hang Ruan
Hi, yangxueyong,

If the ratio will not be modified frequently, we could set a long TTL for
the cache in the lookup table to reduce the frequency of reading, or put a
cache in the UDF.
If you need the exact ratio when the data arrives, we have to read it from
the remote storage each time we get new data.

Best,
Hang

yangxueyong  于2023年5月6日周六 18:08写道:

> Thank you very much for your reply. We have also thought about this
> solution, but our transaction volume is very large, and the TPS reaches
> thousands. We are worried that reading the database will become a
> performance bottleneck. I think we can also read redis, but you still have
> Any other options?
>
> 在 2023年5月6日 17:54,Hang Ruan 写道:
>
> Hi, yxy,
>
> I think this scenario could be resolved by a lookup join or a UDF. We can
> store the ratio in the mysql table. Then we could read it by a lookup join
> or implement a UDF to read the ratio.
>
> Best,
> Hang
>
> yxy  于2023年5月6日周六 15:14写道:
>
>> Hello, we have a business scenario.  We have a real-time process to
>> calculate how much red envelopes should be given to them for each
>> transaction.  For example, if a customer pays $100, we will give him a
>> rebate of one thousandth.  We currently use flinksql to Realize this
>> function, but we found that flinksql cannot dynamically adjust this ratio.
>> We want to know can flinksql implement broadcast tables like this?
>> Broadcast the ratio?
>
>


Re: Can flink1.15.2 use flink sql to create a broadcast table? I didn't find anything related in https://flink.apache.org/

2023-05-06 Thread Hang Ruan
Hi, yxy,

I think this scenario could be resolved by a lookup join or a UDF. We can
store the ratio in the mysql table. Then we could read it by a lookup join
or implement a UDF to read the ratio.

Best,
Hang

yxy  于2023年5月6日周六 15:14写道:

> Hello, we have a business scenario.  We have a real-time process to
> calculate how much red envelopes should be given to them for each
> transaction.  For example, if a customer pays $100, we will give him a
> rebate of one thousandth.  We currently use flinksql to Realize this
> function, but we found that flinksql cannot dynamically adjust this ratio.
> We want to know can flinksql implement broadcast tables like this?
> Broadcast the ratio?


Re: Issue when Running the flink-kuberenetes-operator sql-runner (The file STDOUT does not exist on the TaskExecutor)

2023-05-04 Thread Hang Ruan
Hi, Nathan,

I think the previous discussion[1] is helpful for you.

Best,
Hang

[1] https://lists.apache.org/thread/cgwsr6f1l3202ktwcvtyxtdsgj2vyms1

Nathan Moderwell  于2023年5月5日周五 03:54写道:

> Hi,
>
> I'm running the sql-runner example in the flink-kuberenetes-operator repo
> and hitting an error. I've tried building the image from different releases
> of the repo and that does not help. The pods run (status stays at Running)
> and it is able to build the execution graph successfully, however I see the
> error below (the important part seems to be `The file STDOUT does not exist
> on the TaskExecutor`). I've also tried building the python example that has
> a similar sql query and I get the same error. I'm running this example on
> minikube, building everything as-is from the OSS flink-kubernetes-operator
> repo. Any idea why I'm getting this error?
>
> Full Error Logs:
>
> 2023-05-04 19:36:29,983 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> orders[1] -> Sink: print_table[2] (1/1)
> (fa0a897ca85acd1d2b7719bb2c929d2f_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2023-05-04 19:36:30,092 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Deploying Source: orders[1] -> Sink: print_table[2] (1/1) (attempt #0) with
> attempt id
> fa0a897ca85acd1d2b7719bb2c929d2f_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and
> vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to
> python-example-taskmanager-1-1 @ 10.244.0.40 (dataPort=39441) with
> allocation id fe50815410ace91f428c4aee34bce7ab
> 2023-05-04 19:36:30,388 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> orders[1] -> Sink: print_table[2] (1/1)
> (fa0a897ca85acd1d2b7719bb2c929d2f_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from DEPLOYING to INITIALIZING.
> 2023-05-04 19:36:30,496 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> orders[1] -> Sink: print_table[2] (1/1)
> (fa0a897ca85acd1d2b7719bb2c929d2f_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from INITIALIZING to RUNNING.
> 2023-05-04 19:39:23,567 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> [] - Failed to transfer file from TaskExecutor
> python-example-taskmanager-1-1.
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
> TaskExecutor.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> ~[?:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:261)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)
> ~[flink-dist-1.16.1.jar:1.16.1]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> ~[flink-rpc-akka_02bdeb39-d7bb-42a6-9466-c8eb2b8bfca2.jar:1.16.1]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_02bdeb39-d7bb-42a6-9466-c8eb2b8bfca2.jar:1.16.1]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> ~[flink-rpc-akka_02bdeb39-d7bb-42a6-9466-c8eb2b8bfca2.jar:1.16.1]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> ~[?:?]
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
> ~[flink-rpc-akka_02bdeb39-d7bb-42a6-9466-c8eb2b8bfca2.jar:1.16.1]
> at akka.dispatch.OnComplete.internal(Future.scala:299)
> 

Re: Apache Flink Kubernetes Operator 1.4.0

2023-04-27 Thread Hang Ruan
Hi, rania,

I think the quick start document[1] is helpful for you. Other information
could be found in its documents[2].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
[2] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

rania duni  于2023年4月28日周五 01:10写道:

> Hello!
>
> As a newcomer to Flink and Kubernetes, I am seeking detailed instructions
> to help me properly configure and deploy this Flink example (
> https://github.com/apache/flink-kubernetes-operator/tree/main/examples/autoscaling)
> on a Minikube environment. Can you give me specific configurations and
> steps that I need to follow? Maybe it is a naive question, but I am new to
> all this.
>


Re: Why my flink sql job on yarn keep crash

2023-04-18 Thread Hang Ruan
Hi, Si-li,

I think maybe it is not the root cause. You should find whether there are
more exceptions in the JM log and TM logs.

Best,
Hang

Shammon FY  于2023年4月18日周二 09:02写道:

> Hi Si-li
>
> Could you give some more detailed exceptions? Or you can check the metrics
> of your job such as memory usage.
>
> Best,
> Shammon FY
>
>
> On Fri, Apr 14, 2023 at 5:14 PM Si-li Liu  wrote:
>
>> My job read data from mysql and write to doris. It will crash after 20
>> mins ~ 1 hour after start.
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10,
>> backoffTimeMS=1)
>> at org.apache.flink.runtime.executiongraph.failover.flip1.
>> ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>> at org.apache.flink.runtime.executiongraph.failover.flip1.
>> ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler
>> .java:83)
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(
>> DefaultScheduler.java:256)
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(
>> DefaultScheduler.java:247)
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(
>> DefaultScheduler.java:240)
>> at org.apache.flink.runtime.scheduler.SchedulerBase
>> .onTaskExecutionStateUpdate(SchedulerBase.java:738)
>> at org.apache.flink.runtime.scheduler.SchedulerBase
>> .updateTaskExecutionState(SchedulerBase.java:715)
>> at org.apache.flink.runtime.scheduler.SchedulerNG
>> .updateTaskExecutionState(SchedulerNG.java:78)
>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(
>> JobMaster.java:477)
>> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>> .lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>> at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils
>> .runWithContextClassLoader(ClassLoadingUtils.java:83)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>> AkkaRpcActor.java:307)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>> AkkaRpcActor.java:222)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(
>> FencedAkkaRpcActor.java:84)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>> AkkaRpcActor.java:168)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
>> 1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread
>> .java:175)
>> Caused by: java.lang.InterruptedException
>> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
>> .reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
>> .await(AbstractQueuedSynchronizer.java:2173)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(
>> TaskMailboxImpl.java:149)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .processMail(MailboxProcessor.java:352)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:229)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
>> StreamTask.java:831)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:780)
>> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>> Task.java:935)
>> at 

Re: 退订

2023-04-16 Thread Hang Ruan
退订请发送任意邮件到 user-unsubscr...@flink.apache.org,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

Best,
Hang

 于2023年4月15日周六 11:57写道:

> 退订
>
>
>
>
> --
>
> 发自新浪邮箱客户端 
>


Re: java.lang.RuntimeException: Error while getting state

2023-04-14 Thread Hang Ruan
Hi, igyu,

It seems like the state in the join sql can not be recovered rightly. Do
you change the columns in the join sql? If so, I think this may cause
failing to recover from the checkpoint.

Best,
Hang

igyu  于2023年4月14日周五 16:13写道:

> I have a flink-SQL task. (enable savepoint)
> I want change it , so I stop it
>
> sink and source add a cloumn in oracle table
> and modify SQL in flink
>
> when I commit it I get a error
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>   at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>   at akka.actor.Actor.aroundReceive(Actor.scala:537)
>   at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> Caused by: java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
>   at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:168)
>   at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:154)
>   at 
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:65)
>   at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:95)
>   at 
> 

Re: Support of CSV to AVRO Converter in DataStream FileSource

2023-04-14 Thread Hang Ruan
Hi, Kirti,
I think you need a custom convertor for your csv files. The convertors
provided by Flink only define how to translate the data into a Flink type.

Best,
Hang

Kirti Dhar Upadhyay K via user  于2023年4月14日周五
15:27写道:

> Hi Community,
>
>
>
> I am reading CSV data using data stream file source connector and need to
> convert them into AVRO generated specific objects.
>
> I am using CsvReaderFormat with CSVSchema but this supports only primitive
> types of AVRO (that also except null and bytes).
>
>
>
> Is there any support provided for AVRO complex and Logical types as well? As 
> I can see few classes like CsvToRowDataConverters and RowDataToAvroConverter
>
> but seems they are specific to Table APIs.
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>


  1   2   >