Thanks for the pointers. I will try these changes.

From: Jungtaek Lim <kabhwan.opensou...@gmail.com>
Date: Saturday, 22 August 2020 at 2:41 PM
To: GOEL Rajat <rajat.g...@thalesgroup.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Structured Streaming metric for count of delayed/late data

I proposed another approach which provided accurate count, though the number 
doesn't always mean they're dropped. 
(https://github.com/apache/spark/pull/24936 for details)

Btw, the limitation only applies to streaming aggregation, so you can implement 
the aggregation by yourself via (flat)MapGroupsWithState - note that the local 
aggregation is "optimization", so you may need to account the performance 
impact.

On Sat, Aug 22, 2020 at 1:29 PM GOEL Rajat 
<rajat.g...@thalesgroup.com<mailto:rajat.g...@thalesgroup.com>> wrote:
Thanks for pointing me to the Spark ticket and its limitations. Will try these 
changes.
Is there any workaround for this limitation of inaccurate count, maybe by 
adding some additional streaming operation in SS job without impacting perf too 
much ?

Regards,
Rajat

From: Jungtaek Lim 
<kabhwan.opensou...@gmail.com<mailto:kabhwan.opensou...@gmail.com>>
Date: Friday, 21 August 2020 at 12:07 PM
To: Yuanjian Li <xyliyuanj...@gmail.com<mailto:xyliyuanj...@gmail.com>>
Cc: GOEL Rajat <rajat.g...@thalesgroup.com<mailto:rajat.g...@thalesgroup.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Structured Streaming metric for count of delayed/late data

One more thing to say, unfortunately, the number is not accurate compared to 
the input rows on streaming aggregation, because Spark does local-aggregate and 
counts dropped inputs based on "pre-locally-aggregated" rows. You may want to 
treat the number as whether dropping inputs is happening or not.

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li 
<xyliyuanj...@gmail.com<mailto:xyliyuanj...@gmail.com>> wrote:
The metrics have been added in 
https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 
3.1.
Maybe you can backport for testing since it's not a big change.

Best,
Yuanjian

GOEL Rajat <rajat.g...@thalesgroup.com<mailto:rajat.g...@thalesgroup.com>> 
于2020年8月20日周四 下午9:14写道:
Hi All,

I have a query if someone can please help. Is there any metric or mechanism of 
printing count of input records dropped due to watermarking (late data count) 
in a stream, during a window based aggregation, in Structured Streaming ? I am 
using Spark 3.0.

Thanks & Regards,
Rajat

Reply via email to