Re: File compression with File sink

2023-04-14 Thread Hang Ruan
Hi, Anuj,

I searched the jira and found the related issue[1]. But this issue is still
open now.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-6185

Anuj Jain  于2023年4月14日周五 14:58写道:

> Hi Community,
>
> Does Flink File Sink support compression of output files, to reduce the
> file size?
> I think File source supports reading of compressed formats like gzip,
> bzip2 etc.; is there any way for sinking the files in compressed format ?
>
> Any help is appreciated.
>
> Regards
> Anuj
>


Re: Flink Job across Data Centers

2023-04-13 Thread Hang Ruan
Hi, Chirag,

I am not sure whether this FLIP-268[1] is what you want.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-268%3A+Kafka+Rack+Awareness

Andrew Otto  于2023年4月12日周三 22:12写道:

> Hi, I asked a similar question in this thread
> , which
> might have some relevant info.
>
> On Wed, Apr 12, 2023 at 7:23 AM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> Can anyone share any experience on running Flink jobs across data centers?
>>
>> I am trying to create a Multi site/Geo Replicated Kafka cluster. I want
>> that my Flink job to be closely colocated with my Kafka multi site cluster.
>> If the Flink job is bound to a single data center, I believe we will
>> observe a lot of client latency by trying to access the broker in another
>> DC.
>>
>> Rather if I can make my Flink Kafka collectors as rack aware and start
>> fetching data from the closest Kafka broker, I should get better results.
>>
>> I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache
>> Kafka.
>>
>> Thanks.
>>
>>


Re: Flink job manager conditional start of flink jobs

2023-04-13 Thread Hang Ruan
Hi, nage,

I agree to the Shammon's suggestion after reading the context. Maybe you
need a 'Job Management Service' to manage all jobs among different
namespaces. I think the job management is not suitable for implementation
in the Flink engine.

Best,
Hang

Shammon FY  于2023年4月13日周四 11:34写道:

> Hi
>
> The job in ns2 has the permission to stop the job in ns1? How about
> managing the relationship in your `Job Submission Service` if it exists.
> The service can check and stop the job in ns1 before it submitting the job
> to ns2, what do you think?
>
> Best,
> Shammon FY
>
>
> On Thu, Apr 13, 2023 at 10:50 AM naga sudhakar 
> wrote:
>
>> Hi,
>> Thanks for your reply.
>> It is slightly different, would be happy to have any suggestion  for the
>> scenario you mentioned.
>> My scenario: I have 2 namespaces say ns1,ns2. I have to make sure only
>> one of ns1 or ns2 should run my flink jobs. Say initially ns1 is running
>> flink jobs, later planned to move them to ns2. Now when I start in ns2, I
>> can make an api call to ns1 jobmanager about running jobs and if no jobs
>> then only I should start in ns2. I can introduce this logic inside the
>> flink job java main method  where my total streaming logic present. So if I
>> identify then, how can I stop the initiated job?
>>
>> Thanks,
>> Nagasudhakar.
>>
>> On Thu, 13 Apr, 2023, 7:08 am Shammon FY,  wrote:
>>
>>> Hi naga
>>>
>>> Could you provide a specific description of your scene? It sounds like
>>> your requirement requires a uniqueness check to ensure that there are no
>>> multiple identical jobs running simultaneously, right?
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Wed, Apr 12, 2023 at 4:08 PM naga sudhakar 
>>> wrote:
>>>
 Thanks for your email.
 I am looking  more in terms of running  these flinkk jobs in multi
 names pace environment and make sure only one namespace  flink jobs are
 running.
 So on the Job manager when i try to start a flink job, it has to check
 if it's allowed to run in this namespace  or not and accordingly flink job
 shud turn into running state otherwise it shud cancel  by itself



 On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:

> Hi,
>
> Is the job you want to start running or already finished?
> If the job is running, this is simply a failover or a JM failover
> case.
> While if the job has finished, there's no such feature that can
> restart the job
> automatically, AFAIK.  The job has to be submitted again.
>
> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
> wrote:
>
>> Hi Team,
>> Greetings!!
>> Just wanted to know when job manager or task manager is being
>> restarted, is there a way to run the existing flink jobs based on a
>> condition? Same query when I am starting flink job fresh also.
>>
>> Please let me know if any more information is required from my side.
>>
>> Thanks & Regards
>> Nagasudhakar  Sajja.
>>
>


Re: Quick question about flink document.

2023-04-09 Thread Hang Ruan
Hi, Dongwoo,

I think there is no problem in this part. This part describes snapshotting
Operator State, which is checkpointing. The checkpoint will store by the
JobManager and use the checkpoint storage.

Best,
Hang

Feng Jin  于2023年4月10日周一 00:32写道:

> Hi Dongwoo
>
>
> This can be quite confusing.
> Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
> included three types of statebackends:
> *MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.
>
> The default *MemoryStateBackend* uses heap as the backend, and the state
> is stored in jobManger.
>
>
> You can refer to this migration document for more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
> .
>
>
> Best
> Feng
>
> On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim 
> wrote:
>
>> Hi community, I’m new to flink and trying to learn about the concepts of
>> flink to prepare migrating heron application to flink.
>> I have a quick question about this flink document.
>> (
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
>> )
>>
>> What I understood is states are stored in configured state backend which
>> can be either task manager’s heap or rocksdb.
>> And snapshots of checkpoint is stored by default in job manager’s heap
>> and mostly in distributed file system.
>> But in the document it says like below and it is confusing to me. Isn’t
>> the second line talking about checkpoint storage or checkpoint backend? Not
>> state backend? Thanks in advance, enjoy your weekend!
>>
>> *"Because the state of a snapshot may be large, it is stored in a
>> configurable state backend
>> .
>> By default, this is the JobManager’s memory, but for production use a
>> distributed reliable storage should be configured (such as HDFS)” *
>>
>


Re: How to set reblance in Flink Sql like Streaming api?

2023-04-03 Thread Hang Ruan
Hi, hiw,

IMO, I think the parallelism 1 is enough for you job if we do not consider
the sink. I do not know why you need set the lookup join operator's
parallelism to 6.
The SQL planner will help us to decide the type of the edge and we can not
change it.
Maybe you could share the Execution graph to provide more information.

Best,
Hang

hjw  于2023年4月4日周二 00:37写道:

> For example. I create a kafka source to subscribe  the topic that have
> one partition and set the default parallelism of the job to 6.The next
> operator of kafka source is that  lookup join a mysql table.However, the
> relationship between the kafka Source and the Lookup join operator is
> Forward, so only one subtask in the Lookup join operator can receive data.I
> want to set the relationship between the kafka Source and the Lookup join
> operator is reblance so that all subtask in Lookup join operator can
> recevie data.
>
> Env:
> Flink version:1.15.1
>
>
> --
> Best,
> Hjw
>


Re: Access ExecutionConfig from new Source and Sink API

2023-04-03 Thread Hang Ruan
Hi, christopher,

I think there is already about the ExecutionConfig for new Sink API in
the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for
Source API and Sink API.
Maybe we could continue to discuss this topic under FLIP-287.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853

Yufan Sheng  于2023年4月3日周一 14:06写道:

> I agree with you. It's quite useful to access the ExecutionConfig in
> Source API. When I develop the flink-connector-pulsar. The only
> configuration that I can't access is the checkpoint configure which is
> defined in ExecutionConfig. I can switch the behavior automatically by
> the checkpoint switch. So I have to add more custom configurations for
> the Pulsar Source.
>
> On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee 
> wrote:
> >
> > Hello,
> >
> > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
> FLIP-143 APIs. The scaffolding is more complicated than the old
> SourceFunction and SinkFunction, but not terrible. However I can't figure
> out how to access the ExecutionConfig under these new APIs. This was
> possible in the old APIs by way of the RuntimeContext of the
> AbstractRichFunction (which are extended by RichSourceFunction and
> RichSinkFunction).
> >
> > The reason I would like this is:  some interactions with external
> systems may be invalid under certain Flink job execution parameters.
> Consider a system like NATS which allows for acknowledgements of messages
> received. I would ideally acknowledge all received messages by the source
> connector during checkpointing. If I fail to acknowledge the delivered
> messages, after a pre-configured amount of time, NATS would resend the
> message (which is good in my case for fault tolerance).
> >
> > However, if a Flink job using these connectors has disabled
> checkpointing or made the interval too large, the connector will never
> acknowledge delivered messages and the NATS system may send the message
> again and cause duplicate data. I would be able to avoid this if I could
> access the ExecutionConfig to check these parameters and throw early.
> >
> > I know that the SourceReaderContext gives me access to the
> Configuration, but that doesn't handle the case where the
> execution-environment is set programatically in a job definition rather
> than through configuration. Any ideas?
> >
> > Thanks,
> > Chris
>


Re: org.apache.flink.table.api.ValidationException

2023-03-29 Thread Hang Ruan
Hi,

This error occurs when the data type can not be parsed. You could read this
part to see more details about the User-Defined Data Types[1].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/types/#user-defined-data-types


柒朵 <1303809...@qq.com> 于2023年3月29日周三 17:52写道:

>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Could not extract a data type from 'class
> UserStatus'. Please pass the required data type manually or allow RAW types.
> --
> 柒朵
> 1303809...@qq.com
>
> 
>
>


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Hang Ruan
Congratulations!

Best,
Hang

yu zelin  于2023年3月28日周二 10:27写道:

> Congratulations!
>
> Best,
> Yu Zelin
>
> 2023年3月27日 17:23,Yu Li  写道:
>
> Dear Flinkers,
>
>
>
> As you may have noticed, we are pleased to announce that Flink Table Store 
> has joined the Apache Incubator as a separate project called Apache 
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
> streaming data lake platform for high-speed data ingestion, change data 
> tracking and efficient real-time analytics, with the vision of supporting a 
> larger ecosystem and establishing a vibrant and neutral open source community.
>
>
>
> We would like to thank everyone for their great support and efforts for the 
> Flink Table Store project, and warmly welcome everyone to join the 
> development and activities of the new project. Apache Flink will continue to 
> be one of the first-class citizens supported by Paimon, and we believe that 
> the Flink and Paimon communities will maintain close cooperation.
>
>
> 亲爱的Flinkers,
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
> Apache 
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
> Best Regards,
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
> 致礼,
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
> [1] https://paimon.apache.org/
> [2] https://github.com/apache/incubator-paimon
> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>
>
>


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Hang Ruan
Congratulations!

Best,
Hang

yu zelin  于2023年3月28日周二 10:27写道:

> Congratulations!
>
> Best,
> Yu Zelin
>
> 2023年3月27日 17:23,Yu Li  写道:
>
> Dear Flinkers,
>
>
>
> As you may have noticed, we are pleased to announce that Flink Table Store 
> has joined the Apache Incubator as a separate project called Apache 
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
> streaming data lake platform for high-speed data ingestion, change data 
> tracking and efficient real-time analytics, with the vision of supporting a 
> larger ecosystem and establishing a vibrant and neutral open source community.
>
>
>
> We would like to thank everyone for their great support and efforts for the 
> Flink Table Store project, and warmly welcome everyone to join the 
> development and activities of the new project. Apache Flink will continue to 
> be one of the first-class citizens supported by Paimon, and we believe that 
> the Flink and Paimon communities will maintain close cooperation.
>
>
> 亲爱的Flinkers,
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
> Apache 
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
> Best Regards,
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
> 致礼,
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
> [1] https://paimon.apache.org/
> [2] https://github.com/apache/incubator-paimon
> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>
>
>


Re: Are metadata columns required to get declared in the table's schema?

2023-03-26 Thread Hang Ruan
Hi, Jie,

If you don't need these metadata columns, you don't need to declare them
for the table. Then metadata columns will not be read from sources and will
not be written into the sink.
You can query a table that is without the metadata column declaration. It
depends on your requests.

Best,
Hang

Jie Han  于2023年3月26日周日 21:42写道:

> Thank you for your respond.
> Actually I noticed that the doc says 'However, declaring a metadata column
> in a table’s schema is optional’.
> So, does it mean that we don’t need to declare it when we don't query it
> rather than we can query it without the declaration?
>
> Best,
> Jay
>
>
>
>
>
>
>


Re: Are metadata columns required to get declared in the table's schema?

2023-03-26 Thread Hang Ruan
ps : DDL I said is the CREATE TABLE statements.

Best,
Hang

Hang Ruan  于2023年3月26日周日 21:33写道:

> Hi, Jie,
>
> In Flink, if we want to access a metadata column, we need to declare it in
> the DDL.
> More details could be found here[1].
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/#columns
>
> Jie Han  于2023年3月26日周日 14:58写道:
>
>> Hi community, I want to query a metadata column from my table t. Do I
>> need to declare it in the table schema explicitly?
>>
>> In spark, metadata columns are *hidden* columns, which means we don’t
>> need to declare it in the table ddl, we only explicitly reference it in our
>> query. For instance, select *, _metadata from t.
>>
>>
>>


Re: Are metadata columns required to get declared in the table's schema?

2023-03-26 Thread Hang Ruan
Hi, Jie,

In Flink, if we want to access a metadata column, we need to declare it in
the DDL.
More details could be found here[1].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/#columns

Jie Han  于2023年3月26日周日 14:58写道:

> Hi community, I want to query a metadata column from my table t. Do I
> need to declare it in the table schema explicitly?
>
> In spark, metadata columns are *hidden* columns, which means we don’t
> need to declare it in the table ddl, we only explicitly reference it in our
> query. For instance, select *, _metadata from t.
>
>
>


Re: Table API function and expression vs SQL

2023-03-25 Thread Hang Ruan
Hi,

I think the SQL job is better. Flink SQL jobs can be easily shared with
others for debugging. And it is more suitable for flow batch integration.
For a small part of jobs which can not be expressed through SQL, we will
choose a job by DataStream API.

Best,
Hang

ravi_suryavanshi.yahoo.com via user  于2023年3月24日周五
17:25写道:

> Hello Team,
> Need your advice on which method is recommended considering don't want to
> change my query code when the Flink is updated/upgraded to the higher
> version.
>
> Here I am seeking advice for writing the SQL using java code(Table API
> function and Expression) or using pure SQL.
>
> I am assuming that SQL will not have any impact if upgraded to the higher
> version.
>
> Thanks and Regards,
> Ravi
>


Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-25 Thread Hang Ruan
Thanks for the great work ! Congrats all!

Best,
Hang

Panagiotis Garefalakis  于2023年3月25日周六 03:22写道:

> Congrats all! Well done!
>
> Cheers,
> Panagiotis
>
> On Fri, Mar 24, 2023 at 2:46 AM Qingsheng Ren  wrote:
>
> > I'd like to say thank you to all contributors of Flink 1.17. Your support
> > and great work together make this giant step forward!
> >
> > Also like Matthias mentioned, feel free to leave us any suggestions and
> > let's improve the releasing procedure together.
> >
> > Cheers,
> > Qingsheng
> >
> > On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
> > wrote:
> >
> >> Congrats to all the people involved!
> >>
> >> Best
> >>
> >> Etienne
> >>
> >> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
> >> > The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.17.0, which is the first release for the Apache Flink
> 1.17
> >> series.
> >> >
> >> > Apache Flink® is an open-source unified stream and batch data
> >> processing framework for distributed, high-performing, always-available,
> >> and accurate data applications.
> >> >
> >> > The release is available for download at:
> >> > https://flink.apache.org/downloads.html
> >> >
> >> > Please check out the release blog post for an overview of the
> >> improvements for this release:
> >> >
> >>
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> >> >
> >> > The full release notes are available in Jira:
> >> >
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351585
> >> >
> >> > We would like to thank all contributors of the Apache Flink community
> >> who made this release possible!
> >> >
> >> > Best regards,
> >> > Qingsheng, Martijn, Matthias and Leonard
> >>
> >
>


Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-25 Thread Hang Ruan
Thanks for the great work ! Congrats all!

Best,
Hang

Panagiotis Garefalakis  于2023年3月25日周六 03:22写道:

> Congrats all! Well done!
>
> Cheers,
> Panagiotis
>
> On Fri, Mar 24, 2023 at 2:46 AM Qingsheng Ren  wrote:
>
> > I'd like to say thank you to all contributors of Flink 1.17. Your support
> > and great work together make this giant step forward!
> >
> > Also like Matthias mentioned, feel free to leave us any suggestions and
> > let's improve the releasing procedure together.
> >
> > Cheers,
> > Qingsheng
> >
> > On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
> > wrote:
> >
> >> Congrats to all the people involved!
> >>
> >> Best
> >>
> >> Etienne
> >>
> >> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
> >> > The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.17.0, which is the first release for the Apache Flink
> 1.17
> >> series.
> >> >
> >> > Apache Flink® is an open-source unified stream and batch data
> >> processing framework for distributed, high-performing, always-available,
> >> and accurate data applications.
> >> >
> >> > The release is available for download at:
> >> > https://flink.apache.org/downloads.html
> >> >
> >> > Please check out the release blog post for an overview of the
> >> improvements for this release:
> >> >
> >>
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> >> >
> >> > The full release notes are available in Jira:
> >> >
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351585
> >> >
> >> > We would like to thank all contributors of the Apache Flink community
> >> who made this release possible!
> >> >
> >> > Best regards,
> >> > Qingsheng, Martijn, Matthias and Leonard
> >>
> >
>


Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?

2023-03-24 Thread Hang Ruan
Hi, Elkhan,

I think this is an intended behavior. If the parallelism of an operator is
not specified, it will be the same as the previous one instead of the
default parallelism.
Actually the table planner will help us to do most jobs. There should not
be a way to modify the parallelism for every operator. After all we don't
know what operators will be contained when we write the sql.

Best,
Hang

Elkhan Dadashov  于2023年3月24日周五 14:14写道:

> Checking with the community again, if anyone explored this before.
>
> Thanks.
>
>
> On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov  >
> wrote:
>
> > Dear Flink developers,
> >
> > Wanted to check, if there is a way to control the parallelism of
> > auto-generated Flink operators of the FlinkSQL job graph?
> >
> > In Java API, it is possible to have full control of the parallelism of
> > each operator.
> >
> > On FlinkSQL some source and sink connectors support `source.parallelism`
> > and `sink.parallelism`, and the rest can be set via
> `default.parallelism`.
> >
> > In this particular scenario, enchancedEvents gets chained to the
> > KafkaSource operator, it can be separated by calling disableChain() on
> > KafkaSource  stream on Kafka connector side, but even with disabled
> > chaining on the source stream, `enhancedEvents` operator parallelism is
> > still set to 5 (same as Kafka Source operator parallelism), instead of 3
> > (which is default parallelism) :
> >
> > ```sql
> > SET 'parallelism.default' = '3';
> >
> > CREATE TABLE input_kafka_table
> > (
> > ...
> > ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
> > WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
> > ) WITH (
> > 'connector' = 'kafka',
> > 'source.parallelism' = '5' // this is supported by cutomization of
> > kafka connector
> > ...
> > );
> >
> > CREATE TEMPORARY VIEW enhancedEvents AS (
> >  SELECT x, y
> >  FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
> > );
> >
> > CREATE TABLE other_table_source (...) WITH(...);
> > CREATE TABLE other_table_sink (...) WITH(...);
> >
> > BEGIN STATEMENT SET;
> >  INSERT into enhancedEventsSink (Select * from enhancedEvents);
> >  INSERT into other_table_sink (Select z from other_table_source );
> > END;
> > ```
> >
> > Is there a way to force override parallelism of auto-generated operators
> > for FlinkSQL pipeline?
> >
> > Or is this expected behavior of some operator's parallelism not assigned
> > from default parallelism but from another operator's parallelism?
> >
> > Want to understand if this is a bug or intended behavior.
> >
> > Thank you.
> >
> >
>


Re: Bootstrapping multiple state within same operator

2023-03-22 Thread Hang Ruan
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The
transformations are stored in a `Map` whose key is `OperatorID`. I don't
come up with a way that we could register multi transformations for one
operator with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
states at one time.

Best,
Hang

David Artiga  于2023年3月22日周三 15:22写道:

> We are using state
> 
>  processor
> API
> 
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
> Cheers,
> /David
>


Re: Unsubscribe

2023-03-21 Thread Hang Ruan
Hi, please send an email to user-unsubscr...@flink.apache.org to unsubscribe
.

Best,
Hang

laxmi narayan  于2023年3月21日周二 15:26写道:

> Unsubscribe --
> Hi ,
>
>
>
> Thank you.
>


Re: subscribe

2023-03-15 Thread Hang Ruan
Please send an e-mail to user-subscr...@flink.apache.org to subscribe to
the flink user mail list.

Best,
Hang

mark  于2023年3月15日周三 22:07写道:

> subscribe
>


Re: Kafka sql with validation exception

2023-03-15 Thread Hang Ruan
Hi, Lasse,

I think you should make sure the situation as Shammon said.

Maybe you need to use the maven-shade-plugin like this to package, and make
sure files in `META-INF/services` are  merged together.

   org.apache.maven.plugins <
> artifactId>maven-shade-plugin 3.2.4 <
> executions>  package  shade
> "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>  transformers>  plugins> 


Best,
Hang

Shammon FY  于2023年3月15日周三 19:21写道:

> Hi Lasse
>
> I think you can first check whether there is a file
> `META-INF/services/org.apache.flink.table.factories.Factory` in your uber
> jar and there's
> `org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
> in the file. Flink would like to create table factory from that file.
> And then you can check whether your uber jar are in the classpath of flink
> cluster
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> I have a simple job creating a table from Kafka. It works perfect on my
>> local machine but when I build a Uber jar and use the official Flink image
>> I get a validation exception.
>>
>> Could not find any factory for identifier ‘Kafka’ that implements
>> org.Apache.Flink.table.dynamicTableFactory in the class path.
>>
>> The uber jar contains Flink-connector-kafka and
>> Flink-sql-connector-Kafka.
>>
>> I can see on my local machine it calls discovery factory in
>> flink-table-common but on my cluster it use flink-table-api-java-Uber.
>>
>> And the list of available identities doesn’t contain ‘Kafka’ and
>> upsert-Kafka as It does on my local machine. 樂
>>
>> Anyone has a clue where I should look for the problem?
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>


Re: Are the Table API Connectors production ready?

2023-03-13 Thread Hang Ruan
Hi, yuxia,
I would like to help to complete this task.

Best,
Hang

yuxia  于2023年3月13日周一 09:32写道:

> Yeah, you're right. We don't provide filtering files with patterns. And
> actually we had already a jira[1] for it.
> I was intended to do this in the past, but don't have much time.  Anyone
> who are insterested can take it over. We're
> happy to help review.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17398
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"User" 
> *收件人: *"Yaroslav Tkachenko" , "Shammon FY" <
> zjur...@gmail.com>
> *抄送: *"User" 
> *发送时间: *星期一, 2023年 3 月 13日 上午 12:36:46
> *主题: *Re: Are the Table API Connectors production ready?
>
> Thanks a lot, Yaroslav and Shammon.
> I want to use the Filesystem Connector.  I tried it works well till it is
> running. If the job is restarted. It processes all the files again.
>
> Could not find the move or delete option after collecting the files. Also,
> I could not find the filtering using patterns.
>
> Pattern matching is required as different files exist in the same folder.
>
> Regards,
> Ravi
> On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY <
> zjur...@gmail.com> wrote:
>
>
> Hi Ravi
>
> Agree with Yaroslav and if you find any problems in use, you can create an
> issue in jira
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK . I have
> used kafka/jdbc/hive in production too, they work well.
>
> Best,
> Shammon
>
> On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko 
> wrote:
>
> Hi Ravi,
>
> All of them should be production ready. I've personally used half of them
> in production.
>
> Do you have any specific concerns?
>
> On Thu, Mar 9, 2023 at 9:39 AM ravi_suryavanshi.yahoo.com via user <
> user@flink.apache.org> wrote:
>
> Hi,
> Can anyone help me here?
>
> Thanks and regards,
> Ravi
>
> On Monday, 27 February, 2023 at 09:33:18 am IST,
> ravi_suryavanshi.yahoo.com via user  wrote:
>
>
> Hi Team,
>
>
> In Flink 1.16.0, we would like to use some of the Table API Connectors for
> production. Kindly let me know if the below connectors are production ready
> or only for testing purposes.
>
> NameVersionSourceSink
> Filesystem
> 
>  Bounded
> and Unbounded Scan, Lookup Streaming Sink, Batch Sink
> Elasticsearch
> 
>  6.x
> & 7.x Not supported Streaming Sink, Batch Sink
> Opensearch
> 
>  1.x
> & 2.x Not supported Streaming Sink, Batch Sink
> Apache Kafka
> 
> 0.10+ Unbounded Scan Streaming Sink, Batch Sink
> Amazon DynamoDB
> 
>  Not
> supported Streaming Sink, Batch Sink
> Amazon Kinesis Data Streams
> 
>  Unbounded
> Scan Streaming Sink
> Amazon Kinesis Data Firehose
> 
>  Not
> supported Streaming Sink
> JDBC
> 
>  Bounded
> Scan, Lookup Streaming Sink, Batch Sink
> Apache HBase
> 
>  1.4.x
> & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
> Apache Hive
> 
>
> Thanks and regards
>
>
>


Re: unsubscribe

2023-02-27 Thread Hang Ruan
Hi, please send to user-unsubscr...@flink.apache.org if you want to
unsubscribe user mail list.

Best,
Hang

zhangjunjie  于2023年2月28日周二 00:11写道:

> unsubscribe
>
>
>


Re: Could savepoints contain in-flight data?

2023-02-13 Thread Hang Ruan
Hi Alexis,

If we change the operator uid and restart the job, the job will not be
started successfully[1]. We have to use --allowNonRestoredState to start
it. This means that the state for the old uid will not be used in the
operator with the new uid. I think the data in the state will be lost.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#what-happens-if-i-delete-an-operator-that-has-state-from-my-job

Alexis Sarda-Espinosa  于2023年2月13日周一 19:56写道:

> Hi Hang,
>
> Thanks for the confirmation. One follow-up question with a somewhat
> convoluted scenario:
>
>1. An unaligned checkpoint is created.
>2. I stop the job *without* savepoint.
>3. I want to start a modified job from the checkpoint, but I changed
>one of the operator's uids.
>
> If the operator whose uid changed had in-flight data as part of the
> checkpoint, it will lose said data after starting, right?
>
> I imagine this is not good practice, but it's just a hypothetical scenario
> I wanted to understand better.
>
> Regards,
> Alexis.
>
>
> Am Mo., 13. Feb. 2023 um 12:33 Uhr schrieb Hang Ruan <
> ruanhang1...@gmail.com>:
>
>> ps: the savepoint will also not contain in-flight data.
>>
>> Best,
>> Hang
>>
>> Hang Ruan  于2023年2月13日周一 19:31写道:
>>
>>> Hi Alexis,
>>>
>>> No, aligned checkpoint will not contain the in-flight. Aligned
>>> checkpoint makes sure that the data before the barrier has been processed
>>> and there is no need to store in-flight data for one checkpoint.
>>>
>>> I think these documents[1][2] will help you to understand it.
>>>
>>>
>>> Best,
>>> Hang
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/stateful-stream-processing/#checkpointing
>>>
>>> Alexis Sarda-Espinosa  于2023年2月11日周六 06:00写道:
>>>
>>>> Hello,
>>>>
>>>> One feature of unaligned checkpoints is that the checkpoint barriers
>>>> can overtake in-flight data, so the buffers are persisted as part of the
>>>> state.
>>>>
>>>> The documentation for savepoints doesn't mention anything explicitly,
>>>> so just to be sure, will savepoints always wait for in-flight data to be
>>>> processed before they are completed, or could they also persist buffers in
>>>> certain situations (e.g. when there's backpressure)?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Could savepoints contain in-flight data?

2023-02-13 Thread Hang Ruan
ps: the savepoint will also not contain in-flight data.

Best,
Hang

Hang Ruan  于2023年2月13日周一 19:31写道:

> Hi Alexis,
>
> No, aligned checkpoint will not contain the in-flight. Aligned checkpoint
> makes sure that the data before the barrier has been processed and there is
> no need to store in-flight data for one checkpoint.
>
> I think these documents[1][2] will help you to understand it.
>
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/stateful-stream-processing/#checkpointing
>
> Alexis Sarda-Espinosa  于2023年2月11日周六 06:00写道:
>
>> Hello,
>>
>> One feature of unaligned checkpoints is that the checkpoint barriers can
>> overtake in-flight data, so the buffers are persisted as part of the state.
>>
>> The documentation for savepoints doesn't mention anything explicitly, so
>> just to be sure, will savepoints always wait for in-flight data to be
>> processed before they are completed, or could they also persist buffers in
>> certain situations (e.g. when there's backpressure)?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Could savepoints contain in-flight data?

2023-02-13 Thread Hang Ruan
Hi Alexis,

No, aligned checkpoint will not contain the in-flight. Aligned checkpoint
makes sure that the data before the barrier has been processed and there is
no need to store in-flight data for one checkpoint.

I think these documents[1][2] will help you to understand it.


Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/stateful-stream-processing/#checkpointing

Alexis Sarda-Espinosa  于2023年2月11日周六 06:00写道:

> Hello,
>
> One feature of unaligned checkpoints is that the checkpoint barriers can
> overtake in-flight data, so the buffers are persisted as part of the state.
>
> The documentation for savepoints doesn't mention anything explicitly, so
> just to be sure, will savepoints always wait for in-flight data to be
> processed before they are completed, or could they also persist buffers in
> certain situations (e.g. when there's backpressure)?
>
> Regards,
> Alexis.
>
>


Re: I want to subscribe users' questions

2023-02-07 Thread Hang Ruan
Hi, guanyuan,

This document(
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list)
will be helpful.
welcome~

Best,
Hang

guanyuan chen  于2023年2月7日周二 21:37写道:

> Hi,
> My name is Guanyuan Chen.I am a big data development engineer, tencent
> wechat department, china. I have 4 years experience in flink developing,
> and want to subscribe flink's development news and help someone developing
> flink job willingly.
> Thanks a lot.
>


Re: I want to subscribe users' questions

2023-02-07 Thread Hang Ruan
Hi, guanyuan,

This document(
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list)
will be helpful.
welcome~

Best,
Hang

guanyuan chen  于2023年2月7日周二 21:37写道:

> Hi,
> My name is Guanyuan Chen.I am a big data development engineer, tencent
> wechat department, china. I have 4 years experience in flink developing,
> and want to subscribe flink's development news and help someone developing
> flink job willingly.
> Thanks a lot.
>


Re: Standalone cluster memory configuration

2023-02-02 Thread Hang Ruan
Hi, Theodor,

The description in
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#memory-configuration
map help you to config the memory for flink.

> Flink tries to shield users as much as possible from the complexity of
> configuring the JVM for data-intensive processing. In most cases, users
> should only need to set the values taskmanager.memory.process.size or
> taskmanager.memory.flink.size (depending on how the setup), and possibly
> adjusting the ratio of JVM heap and Managed Memory via
> taskmanager.memory.managed.fraction. The other options below can be used
> for performance tuning and fixing memory related errors.
>
I think maybe you should set the taskmanager.memory.process.size or
taskmanager.memory.flink.size, or increase the memory of the container.
Hope this helps!

Best,
Hang

Theodor Wübker  于2023年2月2日周四 23:37写道:

> Hello everyone,
>
> I have a Standalone Custer running in a docker-swarm with a very simple
> docker-compose configuration [3].  When I run my job there with a
> parallelism greater than one, I get an out of memory error. Nothing out of
> the ordinary, so I wanted to increase the JVM heap. I did that by setting
> ‘taskmanager.memory.task.heap.size’ according to [1]. However the
> taskmanager would not start, throwing an Exception saying that this
> configuration clashes with the configured total process memory - even
> though I had not configured that at all. Due to this, I could also not set
> the total Flink memory.
> Now I wonder, why did the TM tell me that the total process memory is
> already configured? Also, in [2]  I read that the cluster should not even
> start when neither total Flink memory nor total process memory are
> specified - which, as you can see in my configuration, I have not done [3].
>
> Maybe someone can enlighten me, why it looks like I can’t configure the
> memory properly? Thanks :)
>
> -Theo
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
>
> [3] The compose configuration:
>
>   jobmanager:
> image: flink:1.16.0
> command: jobmanager
> environment:
>   - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=2
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
>
>
>   taskmanager-01:
> image: flink:1.16.0
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
>


Re: Slot sharing behavior in Flink cluster

2022-12-13 Thread Hang Ruan
Hi, Le Xu,

If the job is a streaming job, all tasks should be scheduled before any
data can flow through the pipeline, and tasks will run in parallel.
I think the Execution Mode[1] and FLIP-134[2] will help you to understand
more details.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#execution-behavior
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API


Le Xu  于2022年12月14日周三 02:56写道:

> Hello!
>
> I have a quick question about slot-sharing behavior specified at this link
> (
> https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/).
> My understanding would be each task slot in Flink cluster represents a
> sequentially running operator (and therefore can be seen as a thread). But
> it seems like each operator in Flink is modeled as a thread and many
> threads can be pinned to the same task slot. For a task slot that contains
> both task A and task B (thread A and thread B), could these two tasks run
> in parallel and effectively parallelize tasks assigned to a task slot?
>
> Thanks!
>
> Le
>


Re: Re: How to set disableChaining like streaming multiple INSERT statements in a StatementSet ?

2022-12-10 Thread Hang Ruan
Hi, Hjw,

Maybe this configuration is helpful.
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#pipeline-operator-chaining

Best,
Hang

hjw  于2022年12月8日周四 12:01写道:

> hi,yuxia.
>
> The Dag Image of the job graph is attached in email.
> As the code image showed, I create a Table from DataStream.Then I create a
> StatementSet that contains three INSERT  statements  from same Table.
>
>
> --
> Best,
> Hjw
>
>
> At 2022-12-08 10:04:39, "yuxia"  wrote:
>
> Could you please post the image of the running job graph in Flink UI?
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"hjw" 
> *收件人: *"User" 
> *发送时间: *星期四, 2022年 12 月 08日 上午 12:05:00
> *主题: *How to set  disableChaining like streaming multiple INSERT
>  statements   in a StatementSet ?
>
> Hi,
> I create a StatementSet that  contains  multiple  INSERT  statements.
> I found that  multiple  INSERT tasks will  be organized in a operator
> chain when StatementSet.execute() is invoked.
> How to set  disableChaining like streaming multiple INSERT  statements
>  in a StatementSet api ?
>
> env:
> Flink version:1.14.4
>
>
>
> --
> Best,
> Hjw
>
>


Re: Flink KafkaSource still referencing deleted topic

2022-10-12 Thread Hang Ruan
Hi, Robert,

The configuration allowNonRestoredState should be used like this:
./bin/flink run --detached --allowNonRestoredState

Best,
Hang

Robert Cullen  于2022年10月12日周三 23:13写道:

> I don't see AllowNonRestoredState in the configuration documentation.  How
> would it be passed to a job? On the command line like this:
>
> ./bin/flink run --detached -Dallownonrestoredstate=true ...
>
> On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser 
> wrote:
>
>> Hi Mason,
>>
>> Definitely! Feel free to open a PR and ping me for a review.
>>
>> Cheers, Martijn
>>
>> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen  wrote:
>>
>>> Hi Martjin,
>>>
>>> I notice that this question comes up quite often. Would this be a good
>>> addition to the KafkaSource documentation? I'd be happy to contribute to
>>> the documentation.
>>>
>>> Best,
>>> Mason
>>>
>>> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
>>> wrote:
>>>
 Hi Robert,

 Based on
 https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
 I think you'll need to change the UID for your KafkaSource and restart your
 job with allowNonRestoredState enabled.

 Best regards,

 Martijn

 On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
 wrote:

> We've changed the KafkaSource to ingest from a new topic but the old
> name is still being referenced:
>
> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
> feca28aff5a3958840bee985ee7de4d3).  at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
> splits change due to at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>  at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>... 3 moreCaused by: java.lang.RuntimeException: Failed to get 
> topic metadata.  at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>  at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>   at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)   
>at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition.  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>

Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

We can not set specific 'value.deserializer' in table option.
'key.deserializer' and 'value.deserializer' is always set to
'org.apache.kafka.common.serialization.ByteArrayDeserializer'.

If you want to implement a format, you could take a look at the code
JsonFormatFactory.java in flink-formats/flink-json. And the format will be
loaded via SPI.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:51写道:

> Hi Hang,
>
>
>
> My question is can we use specific ‘value.deserializer’ in table option
> via kafka connector is there any way or not ? I have already kept
> 'value.format' in below code snippet so is that enough and handle
> deserializer by itself internally?
>
> How to create custom format can you please share any link for sample
> example for the same  ?
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
>
>
>
>
> *From:* Hang Ruan 
> *Sent:* Monday, January 10, 2022 3:06 PM
> *To:* d...@flink.apache.org; Ronak Beejawat (rbeejawa) 
> *Cc:* commun...@flink.apache.org; user@flink.apache.org
> *Subject:* Re: Regarding Connector Options - value.deserializer
>
>
>
> Hi, Ronak,
>
>
>
> I think you should implement a custom format by yourself instead of
> overriding. The 'value.format' is a required table option.
>
>
>
> Best,
>
> Hang
>
>
>
> Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:09
> 写道:
>
> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>
>


Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

I think you should implement a custom format by yourself instead of
overriding. The 'value.format' is a required table option.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一
17:09写道:

> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>


Re: kafka源码执行测试用例问题

2021-12-19 Thread Hang Ruan
应该先用mvn install 发布到本地仓库,这个依赖才可以被找到

Yuepeng Pan  于2021年12月17日周五 20:28写道:

> Hi, Chen.
>  如果是idea模式,可以尝试排查下pom中依赖项的scope。
>
>
>
> Best,
> Roc.
>
>
>
>
>
> 在 2021-12-17 17:41:32,"陈卓宇" <2572805...@qq.com.INVALID> 写道:
> >您好社区:
> >
> >我在进行flink源码Kafka连接器部分进行测试用例运行
> >
> >报错日志:
> >
> >[ERROR]
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReaderTest
> Time elapsed: 1.398 s <<< ERROR!
> >java.lang.NoClassDefFoundError:
> org/apache/flink/networking/NetworkFailuresProxy
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createProxy(KafkaTestEnvironment.java:241)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:434)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:136)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:165)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:152)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:115)
> > at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:107)
> > at
> org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.setup(KafkaSourceTestEnv.java:59)
> > at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReaderTest.setup(KafkaPartitionSplitReaderTest.java:87)
> > at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> > at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> > at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> > at
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> > at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> > at
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> > at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126)
> > at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:68)
> > at
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> > at
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> > at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> > at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> > at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> > at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> > at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> > at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> > at
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$11(ClassBasedTestDescriptor.java:397)
> > at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> > at
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:395)
> > at
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:209)
> > at
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:80)
> > at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
> > at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> > at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
> > at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> > at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
> > at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> > at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> > at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> > at
> 

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-08 Thread Hang Ruan
There is no way to end the kafka stream from the deserializer.

When would you want to end the stream? Could you explain why you need to
end the kafka stream without using the offset?

Ayush Chauhan  于2021年12月8日周三 15:29写道:

>
> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>
>
>
> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger 
> wrote:
>
>> Hi Ayush,
>>
>> I couldn't find the documentation you've mentioned. Can you send me a
>> link to it?
>>
>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
>> wrote:
>>
>>> Hi,
>>>
>>> Can you please let me know the alternatives of isEndOfStream() as now
>>> according to docs this method will no longer be used to determine the end
>>> of the stream.
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: enable.auto.commit=true and checkpointing turned on

2021-12-05 Thread Hang Ruan
Hi,

1. Yes, the kafka source will use the Kafka committed offset for the group
id to start the job.

2. No, the auto.offset.reset
  is
from Kafka consumer config, which defines what to do when there is no
initial offset in Kafka or if the current offset does not exist any more on
the server. If the offset exists on the server, the consumer will still
start from the committed offset.

ps: If you enabled checkpointing, there is no need to enable
enable.auto.commit. The offset will be committed to Kafka when checkpoints
complete, which is the default behavior.

Vishal Santoshi  于2021年12月4日周六 02:11写道:

> Hello folks,
>
> 2 questions
>  1. If we have enabled enable.auto.commit and enabled checkpointing and we
> restart a flink application ( without checkpoint or savepoint ) , would the
> kafka consumer start consuming from the last offset committed to kafka.
>
> 2. What if in the above scenario, we have "auto.offset.reset" set to
> "latest". ? Would that ignore the consumer group offset in kafka ?
>
>
> Regards.
>


Re: KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Hang Ruan
Hi,
It seems to be an error in documents. `setDeliverGuarantee` is the method
of class `KafkaSinkBuilder`, .

It could be used like this : KafkaSink.builder().setDeliverGuarantee(xxx)

Lars Skjærven  于2021年12月2日周四 19:34写道:

> Hello,
> upgrading to 1.14 I bumped into an issue with the kafka sink builder when
> defining delivery guarantee:
>
> value setDeliveryGuarantee is not a member of
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]
>
>
> Seems to be working with the default value (i.e. without mentioning
> setDeliveryGuarantee), but compile error when including it.
>
> Is it better to leave it with the default, and let the application cluster
> config define this ?
>
> I believe I build the KafkaSink according to the docs:
>
> import org.apache.flink.connector.base.DeliveryGuarantee
> import
> org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
> KafkaSink}
> import org.apache.flink.connector.kafka.source.KafkaSource
> import
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
>
> val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff
> ]()
>   .setBootstrapServers("...")
>   .setRecordSerializer(
> KafkaRecordSerializationSchema
>   .builder[SomePBStuff]()
>   .setTopic("mytopic")
>   .setKeySerializationSchema((v: SomePBStuff) =>
> v.key.getBytes(StandardCharsets.UTF_8))
>   .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
>   .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>   .build()
>   )
>   .build()
>
>
> in build.sbt I have:
>
> ThisBuild / scalaVersion := "2.12.13"
> val flinkVersion = "1.14.0"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" % "flink-runtime" % flinkVersion % Test,
>
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
>   "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
> )
>
>
>
>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Hang Ruan
Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos  于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos  于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>>
>>>> Thank you for the information.  That still does not answer my question
>>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>>
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>>> method.
>>>>
>>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>>> default behavior with checkpoints?
>>>>
>>>> -Marco
>>>>
>>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Flink 1.14 release note states about this. See [1].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>>
>>>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>>>
>>>>>> Hi everybody,
>>>>>>
>>>>>> I am using Flink 1.12 and migrating my code from using
>>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>>
>>>>>> FlinkKafkaConsumer has the method
>>>>>>
>>>>>> /**
>>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>>> to Kafka on checkpoints.
>>>>>>>  * This setting will only have effect if checkpointing is enabled
>>>>>>> for the job. If checkpointing isn't
>>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>>  * settings will be used.
>>>>>>> */
>>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>>
>>>>>>
>>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>>> offsets on checkpoints"?
>>>>>>
>>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>>> Flink 1.14 documentation says little about it.
>>>>>>
>>>>>>  For example, the Flink 1.14 documentation states:
>>>>>>
>>>>>> Additional Properties
>>>>>>> In addition to properties described above, you can set arbitrary
>>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>>>>>> has
>>>>>>> following options for configuration:
>>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>>> offsets to Kafka brokers on checkpoint
>>>>>>
>>>>>>
>>>>>> And the 1.12 documentation states:
>>>>>>
>>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>>>>>> the
>>>>>>> offsets that were stored in the checkpoint.
>>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>>>> enabled
>>>>>>> in the job.
>>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>>> commit the offsets to Zookeeper.
>>>>>>
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> Marco
>>>>>>
>>>>>>
>>>>>>
>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Hang Ruan
Hi,

In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
is open is the default behavior in KafkaSourceBuilder. And it can not be
changed in KafkaSourceBuilder.

By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we
could change the behavior. This problem will be fixed in 1.12.6. It seems
not to be contained in your version.

Reading the RP will be helpful for you to understand the behavior.


Marco Villalobos  于2021年12月1日周三 上午3:43写道:

> Thanks!
>
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
> does not exist in Flink 1.12.
>
> Is that property supported with the string "commit.offsets.on.checkpoints"?
>
> How do I configure that behavior so that offsets get committed on
> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
> default behavior with checkpoints?
>
>
>
>
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>
>> Hi,
>>
>> Maybe you can write like this :
>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>> "true");
>>
>> Other additional properties could be found here :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>
>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>
>>> Thank you for the information.  That still does not answer my question
>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>> method.
>>>
>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>> default behavior with checkpoints?
>>>
>>> -Marco
>>>
>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> Flink 1.14 release note states about this. See [1].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>
>>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>>
>>>>> Hi everybody,
>>>>>
>>>>> I am using Flink 1.12 and migrating my code from using
>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>
>>>>> FlinkKafkaConsumer has the method
>>>>>
>>>>> /**
>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>> to Kafka on checkpoints.
>>>>>>  * This setting will only have effect if checkpointing is enabled for
>>>>>> the job. If checkpointing isn't
>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>  * settings will be used.
>>>>>> */
>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>
>>>>>
>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>> offsets on checkpoints"?
>>>>>
>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>> Flink 1.14 documentation says little about it.
>>>>>
>>>>>  For example, the Flink 1.14 documentation states:
>>>>>
>>>>> Additional Properties
>>>>>> In addition to properties described above, you can set arbitrary
>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>>>>> has
>>>>>> following options for configuration:
>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>> offsets to Kafka brokers on checkpoint
>>>>>
>>>>>
>>>>> And the 1.12 documentation states:
>>>>>
>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>>>>> the
>>>>>> offsets that were stored in the checkpoint.
>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>>> enabled
>>>>>> in the job.
>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>> commit the offsets to Zookeeper.
>>>>>
>>>>>
>>>>> Thank you.
>>>>>
>>>>> Marco
>>>>>
>>>>>
>>>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Hang Ruan
Hi,

Maybe you can write like this :
builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
"true");

Other additional properties could be found here :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties

Marco Villalobos  于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
>
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
>
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
>
> -Marco
>
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Flink 1.14 release note states about this. See [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>
>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>
>>> Hi everybody,
>>>
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>
>>> FlinkKafkaConsumer has the method
>>>
>>> /**
  * Specifies whether or not the consumer should commit offsets back to
 Kafka on checkpoints.
  * This setting will only have effect if checkpointing is enabled for
 the job. If checkpointing isn't
  * enabled, only the "auto.commit.enable" (for 0.8) /
 "enable.auto.commit" (for 0.9+) property
  * settings will be used.
 */
 FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>
>>>
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>>
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>
>>>  For example, the Flink 1.14 documentation states:
>>>
>>> Additional Properties
 In addition to properties described above, you can set arbitrary
 properties for KafkaSource and KafkaConsumer by using
 setProperties(Properties) and setProperty(String, String). KafkaSource has
 following options for configuration:
 commit.offsets.on.checkpoint specifies whether to commit consuming
 offsets to Kafka brokers on checkpoint
>>>
>>>
>>> And the 1.12 documentation states:
>>>
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
 consume records from a topic and periodically checkpoint all its Kafka
 offsets, together with the state of other operations. In case of a job
 failure, Flink will restore the streaming program to the state of the
 latest checkpoint and re-consume the records from Kafka, starting from the
 offsets that were stored in the checkpoint.
 The interval of drawing checkpoints therefore defines how much the
 program may have to go back at most, in case of a failure. To use fault
 tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
 in the job.
 If checkpointing is disabled, the Kafka consumer will periodically
 commit the offsets to Zookeeper.
>>>
>>>
>>> Thank you.
>>>
>>> Marco
>>>
>>>
>>>


<    1   2