Hi Kamal,

In a similar situation, when a decoding failure happened I would generate a 
special record that I could then detect/filter out (and increment a counter) in 
a FilterFunction immediately following the source.

— Ken


> On Jun 16, 2023, at 2:11 AM, Kamal Mittal via user <user@flink.apache.org> 
> wrote:
> 
> Hello,
>  
> Any way-forward, please suggest.
>  
> Rgds,
> Kamal
>  
> From: Kamal Mittal via user <user@flink.apache.org 
> <mailto:user@flink.apache.org>> 
> Sent: 15 June 2023 10:39 AM
> To: Shammon FY <zjur...@gmail.com <mailto:zjur...@gmail.com>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: RE: Flink bulk and record file source format metrices
>  
> Hello,
>  
> I need one counter matric for no. of corrupt records while decoding parquet 
> records at data source level. I know where the corrupt record handling 
> requires but due to non-existence of “SourceContext” or “RuntimeContext”, 
> unable to do anything w.r.t metric.
>  
> It is needed similarly the way “SourceReaderBase” class maintaining one 
> counter for no. of records emitted.
>  
> Rgds,
> Kamal
>  
> From: Shammon FY <zjur...@gmail.com <mailto:zjur...@gmail.com>> 
> Sent: 14 June 2023 05:33 PM
> To: Kamal Mittal <kamal.mit...@ericsson.com 
> <mailto:kamal.mit...@ericsson.com>>
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Re: Flink bulk and record file source format metrices
>  
> Hi Kamal,
>  
> Can you give more information about the metris you want? In Flink each source 
> task has one source reader which already has some metrics, you can refer to 
> metrics doc[1] for more detailed information.
>  
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ 
> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/>
>  
> Best,
> Shammon FY
>  
> On Tue, Jun 13, 2023 at 11:13 AM Kamal Mittal via user <user@flink.apache.org 
> <mailto:user@flink.apache.org>> wrote:
> Hello,
>  
> Using Flink record stream format file source API as below for parquet records 
> reading.
>  
> FileSource.FileSourceBuilder<?> source = 
> FileSource.forRecordStreamFormat(streamformat, path);
> source.monitorContinuously(Duration.ofMillis(10000));
>  
> Want to log/generate metrices for corrupt records and for the same need to 
> log flink metrices at source level in parquet reader class, is there any way 
> to do that as right now no handle for SourceContext available?
>  
> Rgds,
> Kamal

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to