flink sql如何处理脏数据问题?

2023-10-28 Thread casel.chen
场景:使用flink 
sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
 topic或者写入一个文件便于事后审查。这个目前有办法做到吗?

Re: Enhancing File Processing and Kafka Integration with Flink Jobs

2023-10-28 Thread Alexander Fedulov
> Or was it the querying of the checkpoints you were advising against?

Yes, I meant the approach, not file removal itself. Mainly because how
exactly FileSource stores its state is an implementation detail and there
are no external guarantees for its consistency between even the minor
versions.
On top of that, the original author of the StateProcessor API has moved to
another project, so it has not been actively worked on recently. I am not
sure it is even possible to access the FileSource state directly with it
since FLIP-27 sources do not use the OperatorState abstraction directly [1].

[1]
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L510

Best,
Alexander

On Sat, 28 Oct 2023 at 16:13, Andrew Otto  wrote:

> > This is not a robust solution, I would advise against it.
> Oh no?  Am curious as to why not.  It seems not dissimilar to how Kafka
> topic retention works: the messages are removed after some time period
> (hopefully after they are processed), so why would it be bad to remove
> files that are already processed?
>
> Or was it the querying of the checkpoints you were advising against?
>
> To be sure, I was referring to moving the previously processed files away,
> not the checkpoints themselves.
>
> On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> > I wonder if you could use this fact to query the committed checkpoints
>> and move them away after the job is done.
>>
>> This is not a robust solution, I would advise against it.
>>
>> Best,
>> Alexander
>>
>> On Fri, 27 Oct 2023 at 16:41, Andrew Otto  wrote:
>>
>>> For moving the files:
>>> > It will keep the files as is and remember the name of the file read
>>> in checkpointed state to ensure it doesnt read the same file twice.
>>>
>>> I wonder if you could use this fact to query the committed checkpoints
>>> and move them away after the job is done.  I think it should even be safe
>>> to do this outside of the Flink job periodically (cron, whatever), because
>>> on restart it won't reprocess the files that have been committed in the
>>> checkpoints.
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state
>>>
>>>
>>>
>>>
>>> On Fri, Oct 27, 2023 at 1:13 AM arjun s  wrote:
>>>
 Hi team, Thanks for your quick response.
 I have an inquiry regarding file processing in the event of a job
 restart. When the job is restarted, we encounter challenges in tracking
 which files have been processed and which remain pending. Is there a method
 to seamlessly resume processing files from where they were left off,
 particularly in situations where we need to submit and restart the job
 manually due to any server restart or application restart? This becomes an
 issue when the job processes all the files in the directory from the
 beginning after a restart, and I'm seeking a solution to address this.

 Thanks and regards,
 Arjun

 On Fri, 27 Oct 2023 at 07:29, Chirag Dewan 
 wrote:

> Hi Arjun,
>
> Flink's FileSource doesnt move or delete the files as of now. It will
> keep the files as is and remember the name of the file read in 
> checkpointed
> state to ensure it doesnt read the same file twice.
>
> Flink's source API works in a way that single Enumerator operates on
> the JobManager. The enumerator is responsible for listing the files and
> splitting these into smaller units. These units could be the complete file
> (in case of row formats) or splits within a file (for bulk formats). The
> reading is done by SplitReaders in the Task Managers. This way it ensures
> that only reading is done concurrently and is able to track file
> completions.
>
> You can read more Flink Sources
> 
>  and here
> 
>
> FileSystem
>
> FileSystem # This connector provides a unified Source and Sink for
> BATCH and STREAMING that reads or writes (par...
>
> 
>
>
>
> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s <
> arjunjoice...@gmail.com> wrote:
>
>
> Hello team,
> I'm currently in the process of configuring a Flink job. This job
> entails reading files from a specified directory and then transmitting the
> data to a Kafka sink. I've already successfully designed a Flink job that
> reads the file contents in a streaming manner and effectively sends them 
> to
> Kafka. However, my specific requirement is a bit more intricate. I need 
> the
> job to not only 

Re: Enhancing File Processing and Kafka Integration with Flink Jobs

2023-10-28 Thread Andrew Otto
> This is not a robust solution, I would advise against it.
Oh no?  Am curious as to why not.  It seems not dissimilar to how Kafka
topic retention works: the messages are removed after some time period
(hopefully after they are processed), so why would it be bad to remove
files that are already processed?

Or was it the querying of the checkpoints you were advising against?

To be sure, I was referring to moving the previously processed files away,
not the checkpoints themselves.

On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> > I wonder if you could use this fact to query the committed checkpoints
> and move them away after the job is done.
>
> This is not a robust solution, I would advise against it.
>
> Best,
> Alexander
>
> On Fri, 27 Oct 2023 at 16:41, Andrew Otto  wrote:
>
>> For moving the files:
>> > It will keep the files as is and remember the name of the file read in
>> checkpointed state to ensure it doesnt read the same file twice.
>>
>> I wonder if you could use this fact to query the committed checkpoints
>> and move them away after the job is done.  I think it should even be safe
>> to do this outside of the Flink job periodically (cron, whatever), because
>> on restart it won't reprocess the files that have been committed in the
>> checkpoints.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state
>>
>>
>>
>>
>> On Fri, Oct 27, 2023 at 1:13 AM arjun s  wrote:
>>
>>> Hi team, Thanks for your quick response.
>>> I have an inquiry regarding file processing in the event of a job
>>> restart. When the job is restarted, we encounter challenges in tracking
>>> which files have been processed and which remain pending. Is there a method
>>> to seamlessly resume processing files from where they were left off,
>>> particularly in situations where we need to submit and restart the job
>>> manually due to any server restart or application restart? This becomes an
>>> issue when the job processes all the files in the directory from the
>>> beginning after a restart, and I'm seeking a solution to address this.
>>>
>>> Thanks and regards,
>>> Arjun
>>>
>>> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan 
>>> wrote:
>>>
 Hi Arjun,

 Flink's FileSource doesnt move or delete the files as of now. It will
 keep the files as is and remember the name of the file read in checkpointed
 state to ensure it doesnt read the same file twice.

 Flink's source API works in a way that single Enumerator operates on
 the JobManager. The enumerator is responsible for listing the files and
 splitting these into smaller units. These units could be the complete file
 (in case of row formats) or splits within a file (for bulk formats). The
 reading is done by SplitReaders in the Task Managers. This way it ensures
 that only reading is done concurrently and is able to track file
 completions.

 You can read more Flink Sources
 
  and here
 

 FileSystem

 FileSystem # This connector provides a unified Source and Sink for
 BATCH and STREAMING that reads or writes (par...

 



 On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s <
 arjunjoice...@gmail.com> wrote:


 Hello team,
 I'm currently in the process of configuring a Flink job. This job
 entails reading files from a specified directory and then transmitting the
 data to a Kafka sink. I've already successfully designed a Flink job that
 reads the file contents in a streaming manner and effectively sends them to
 Kafka. However, my specific requirement is a bit more intricate. I need the
 job to not only read these files and push the data to Kafka but also
 relocate the processed file to a different directory once all of its
 contents have been processed. Following this, the job should seamlessly
 transition to processing the next file in the source directory.
 Additionally, I have some concerns regarding how the job will behave if it
 encounters a restart. Could you please advise if this is achievable, and if
 so, provide guidance or code to implement it?

 I'm also quite interested in how the job will handle situations where
 the source has a parallelism greater than 2 or 3, and how it can accurately
 monitor the completion of reading all contents in each file.

 Thanks and Regards,
 Arjun

>>>


Re: Which Flink engine versions do Connectors support?

2023-10-28 Thread Xianxun Ye
Hi Gordon,

Thanks for your information. That is what I need.

And I have responded to the Kafka connector RC vote mail.


Best regards,
Xianxun

> 2023年10月28日 04:13,Tzu-Li (Gordon) Tai  写道:
> 
> Hi Xianxun,
> 
> You can find the list supported Flink versions for each connector here:
> https://flink.apache.org/downloads/#apache-flink-connectors
> 
> Specifically for the Kafka connector, we're in the process of releasing a new 
> version for the connector that works with Flink 1.18.
> The release candidate vote thread is here if you want to test that out: 
> https://lists.apache.org/thread/35gjflv4j2pp2h9oy5syj2vdfpotg486
> 
> Thanks,
> Gordon
> 
> 
> On Fri, Oct 27, 2023 at 12:57 PM Xianxun Ye  > wrote:
>> 
>> Hello Team, 
>> 
>> After the release of Flink 1.18, I found that most connectors had been 
>> externalized, e.g. Kafka, ES, HBase, JDBC, and pulsar connectors.   But I 
>> didn't find any manual or codes indicating which versions of Flink these 
>> connectors could work. 
>> 
>> 
>> Best regards,
>> Xianxun
>>