Hi Kamal, In a similar situation, when a decoding failure happened I would generate a special record that I could then detect/filter out (and increment a counter) in a FilterFunction immediately following the source.
— Ken > On Jun 16, 2023, at 2:11 AM, Kamal Mittal via user <user@flink.apache.org> > wrote: > > Hello, > > Any way-forward, please suggest. > > Rgds, > Kamal > > From: Kamal Mittal via user <user@flink.apache.org > <mailto:user@flink.apache.org>> > Sent: 15 June 2023 10:39 AM > To: Shammon FY <zjur...@gmail.com <mailto:zjur...@gmail.com>> > Cc: user@flink.apache.org <mailto:user@flink.apache.org> > Subject: RE: Flink bulk and record file source format metrices > > Hello, > > I need one counter matric for no. of corrupt records while decoding parquet > records at data source level. I know where the corrupt record handling > requires but due to non-existence of “SourceContext” or “RuntimeContext”, > unable to do anything w.r.t metric. > > It is needed similarly the way “SourceReaderBase” class maintaining one > counter for no. of records emitted. > > Rgds, > Kamal > > From: Shammon FY <zjur...@gmail.com <mailto:zjur...@gmail.com>> > Sent: 14 June 2023 05:33 PM > To: Kamal Mittal <kamal.mit...@ericsson.com > <mailto:kamal.mit...@ericsson.com>> > Cc: user@flink.apache.org <mailto:user@flink.apache.org> > Subject: Re: Flink bulk and record file source format metrices > > Hi Kamal, > > Can you give more information about the metris you want? In Flink each source > task has one source reader which already has some metrics, you can refer to > metrics doc[1] for more detailed information. > > [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ > <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/> > > Best, > Shammon FY > > On Tue, Jun 13, 2023 at 11:13 AM Kamal Mittal via user <user@flink.apache.org > <mailto:user@flink.apache.org>> wrote: > Hello, > > Using Flink record stream format file source API as below for parquet records > reading. > > FileSource.FileSourceBuilder<?> source = > FileSource.forRecordStreamFormat(streamformat, path); > source.monitorContinuously(Duration.ofMillis(10000)); > > Want to log/generate metrices for corrupt records and for the same need to > log flink metrices at source level in parquet reader class, is there any way > to do that as right now no handle for SourceContext available? > > Rgds, > Kamal -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch