Re: Using Spark Accumulators with Structured Streaming
>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei < >>>>>> wezh...@outlook.com> wrote: >>>>>> > > > > >>>>>> > > > > > May I get how the accumulator is accessed in the method >>>>>> > > > > > `onQueryProgress()`? >>>>>> > > > > > >>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way >>>>>> to verify >>>>>> > > > that >>>>>> > > > > > in cluster like this: >>>>>> > > > > > ``` >>>>>> > > > > > // Add the following while loop before invoking >>>>>> awaitTermination >>>>>> > > > > > while (true) { >>>>>> > > > > > println("My acc: " + myAcc.value) >>>>>> > > > > > Thread.sleep(5 * 1000) >>>>>> > > > > > } >>>>>> > > > > > >>>>>> > > > > > //query.awaitTermination() >>>>>> > > > > > ``` >>>>>> > > > > > >>>>>> > > > > > And the accumulator value updated can be found from driver >>>>>> stdout. >>>>>> > > > > > >>>>>> > > > > > -- >>>>>> > > > > > Cheers, >>>>>> > > > > > -z >>>>>> > > > > > >>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>>>>> > > > > > Srinivas V wrote: >>>>>> > > > > > >>>>>> > > > > > > yes, I am using stateful structured streaming. Yes >>>>>> similar to what >>>>>> > > > you >>>>>> > > > > > do. >>>>>> > > > > > > This is in Java >>>>>> > > > > > > I do it this way: >>>>>> > > > > > > Dataset productUpdates = watermarkedDS >>>>>> > > > > > > .groupByKey( >>>>>> > > > > > > (MapFunction>>>>> String>) event >>>>>> > > > -> >>>>>> > > > > > > event.getId(), Encoders.STRING()) >>>>>> > > > > > > .mapGroupsWithState( >>>>>> > > > > > > new >>>>>> > > > > > > >>>>>> > > > > > >>>>>> > > > >>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>>> > > > > > > appConfig, accumulators), >>>>>> > > > > > > >>>>>> Encoders.bean(ModelStateInfo.class), >>>>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>>>> > > > > > > >>>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>>> > > > > > > >>>>>> > > > > > > StateUpdateTask contains the update method. >>>>>> > > > > > > >>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>>> > > > > > > mailinglist...@gmail.com> wrote: >>>>>> > > > > > > >>>>>> > > > > > > > Yes, that's exactly how I am creating them. >>>>>> > > > > > > > >>>>>> > > > > > > > Question... Are you using 'Stateful Structured >>>>>> Streaming' in which >>>>>> > > > > > you've >>>>>> > > > > > > > something like this? >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> > > > > > > > updateAcrossEvents >>>>>> > > > > > > > ) >>>>&g
Re: Using Spark Accumulators with Structured Streaming
; > > Cheers, >>>>> > > > > > -z >>>>> > > > > > >>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>>>> > > > > > Srinivas V wrote: >>>>> > > > > > >>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar >>>>> to what >>>>> > > > you >>>>> > > > > > do. >>>>> > > > > > > This is in Java >>>>> > > > > > > I do it this way: >>>>> > > > > > > Dataset productUpdates = watermarkedDS >>>>> > > > > > > .groupByKey( >>>>> > > > > > > (MapFunction>>>> String>) event >>>>> > > > -> >>>>> > > > > > > event.getId(), Encoders.STRING()) >>>>> > > > > > > .mapGroupsWithState( >>>>> > > > > > > new >>>>> > > > > > > >>>>> > > > > > >>>>> > > > >>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>> > > > > > > appConfig, accumulators), >>>>> > > > > > > >>>>> Encoders.bean(ModelStateInfo.class), >>>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>>> > > > > > > >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> > > > > > > >>>>> > > > > > > StateUpdateTask contains the update method. >>>>> > > > > > > >>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>> > > > > > > mailinglist...@gmail.com> wrote: >>>>> > > > > > > >>>>> > > > > > > > Yes, that's exactly how I am creating them. >>>>> > > > > > > > >>>>> > > > > > > > Question... Are you using 'Stateful Structured >>>>> Streaming' in which >>>>> > > > > > you've >>>>> > > > > > > > something like this? >>>>> > > > > > > > >>>>> > > > > > > > >>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> > > > > > > > updateAcrossEvents >>>>> > > > > > > > ) >>>>> > > > > > > > >>>>> > > > > > > > And updating the Accumulator inside >>>>> 'updateAcrossEvents'? We're >>>>> > > > > > experiencing this only under 'Stateful Structured >>>>> Streaming'. In other >>>>> > > > > > streaming applications it works as expected. >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>>>> srini@gmail.com> >>>>> > > > > > wrote: >>>>> > > > > > > > >>>>> > > > > > > >> Yes, I am talking about Application specific >>>>> Accumulators. >>>>> > > > Actually I >>>>> > > > > > am >>>>> > > > > > > >> getting the values printed in my driver log as well as >>>>> sent to >>>>> > > > > > Grafana. Not >>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>>>> “client” on >>>>> > > > a >>>>> > > > > > yarn >>>>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>>>> It should >>>>> > > > > > work the >>>>> > > > > > > >> same for cluster mode as well. >>>>> > > > > > > >> Create accumulators like this: >>&g
Re: Using Spark Accumulators with Structured Streaming
parkStructuredStreamingConfig().STATE_TIMEOUT), >>>> > > > > > > appConfig, accumulators), >>>> > > > > > > Encoders.bean(ModelStateInfo.class), >>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>> > > > > > > >>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>> > > > > > > >>>> > > > > > > StateUpdateTask contains the update method. >>>> > > > > > > >>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>> > > > > > > mailinglist...@gmail.com> wrote: >>>> > > > > > > >>>> > > > > > > > Yes, that's exactly how I am creating them. >>>> > > > > > > > >>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming' >>>> in which >>>> > > > > > you've >>>> > > > > > > > something like this? >>>> > > > > > > > >>>> > > > > > > > >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> > > > > > > > updateAcrossEvents >>>> > > > > > > > ) >>>> > > > > > > > >>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >>>> We're >>>> > > > > > experiencing this only under 'Stateful Structured Streaming'. >>>> In other >>>> > > > > > streaming applications it works as expected. >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>>> srini@gmail.com> >>>> > > > > > wrote: >>>> > > > > > > > >>>> > > > > > > >> Yes, I am talking about Application specific >>>> Accumulators. >>>> > > > Actually I >>>> > > > > > am >>>> > > > > > > >> getting the values printed in my driver log as well as >>>> sent to >>>> > > > > > Grafana. Not >>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>>> “client” on >>>> > > > a >>>> > > > > > yarn >>>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>>> It should >>>> > > > > > work the >>>> > > > > > > >> same for cluster mode as well. >>>> > > > > > > >> Create accumulators like this: >>>> > > > > > > >> AccumulatorV2 accumulator = >>>> sparkContext.longAccumulator(name); >>>> > > > > > > >> >>>> > > > > > > >> >>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>> > > > > > > >> mailinglist...@gmail.com> wrote: >>>> > > > > > > >> >>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>>> getting >>>> > > > > > computed in >>>> > > > > > > >>> your code? I am talking about the Application Specific >>>> > > > Accumulators. >>>> > > > > > The >>>> > > > > > > >>> other standard counters such as >>>> > > > 'event.progress.inputRowsPerSecond' >>>> > > > > > are >>>> > > > > > > >>> getting populated correctly! >>>> > > > > > > >>> >>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>>> srini@gmail.com> >>>> > > > > > wrote: >>>> > > > > > > >>> >>>> > > > > > > >>>> Hello, >>>> > > > > > > >>>> Even for me it comes as 0 when I print in >>>> OnQueryProgress. I use >>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local >>>> but not on >>>> > > > > > cluster. >>>> > > > > > > >>>> But one consolation is that when I send metrics to >>>> Graphana, the >>>> > > > > > values >>>> > > > > > > >>>> are coming there. >>>> > > > > > > >>>> >>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>>> > > > > > > >>>> >>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. >>>> > > > > > > >>>>> >>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>>> wezh...@outlook.com >>>> > > > > >>>> > > > > > wrote: >>>> > > > > > > >>>>> >>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the >>>> OUT type >>>> > > > > > should >>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>>> implementation >>>> > > > for >>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is >>>> there any >>>> > > > chance >>>> > > > > > to replace >>>> > > > > > > >>>>>> CollectionLongAccumulator by >>>> CollectionAccumulator[2] or >>>> > > > > > LongAccumulator[3] >>>> > > > > > > >>>>>> and test if the StreamingListener and other codes >>>> are able to >>>> > > > > > work? >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> --- >>>> > > > > > > >>>>>> Cheers, >>>> > > > > > > >>>>>> -z >>>> > > > > > > >>>>>> [1] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0 >>>> > > > > > > >>>>>> [2] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0 >>>> > > > > > > >>>>>> [3] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0 >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> From: Something Something >>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>> > > > > > > >>>>>> To: spark-user >>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured >>>> > > > Streaming >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>>> this >>>> > > > > > functionality >>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on >>>> this but >>>> > > > > > can't get it >>>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>>> Accumulator >>>> > > > > > class that >>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>>> track of >>>> > > > one or >>>> > > > > > more >>>> > > > > > > >>>>>> accumulators. Here's the definition: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> When the job begins we register an instance of this >>>> class: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>>> "MyAccumulator") >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Is this working under Structured Streaming? >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any >>>> help >>>> > > > would be >>>> > > > > > > >>>>>> greatly appreciated. Thanks. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>> > > > > > > >>>>>> mailinglist...@gmail.com>>> mailinglist...@gmail.com>> >>>> > > > wrote: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>>> > > > Accumulators in >>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 >>>> when I >>>> > > > try to >>>> > > > > > print >>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> > > > > > > >>>>>> updateAcrossEvents >>>> > > > > > > >>>>>> ) >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> The accumulators get incremented in >>>> 'updateAcrossEvents'. >>>> > > > I've a >>>> > > > > > > >>>>>> StreamingListener which writes values of the >>>> accumulators in >>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>>> Accumulators >>>> > > > are >>>> > > > > > ALWAYS >>>> > > > > > > >>>>>> ZERO! >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> When I added log statements in the >>>> updateAcrossEvents, I >>>> > > > could see >>>> > > > > > > >>>>>> that these accumulators are getting incremented as >>>> expected. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. >>>> In Local >>>> > > > mode >>>> > > > > > it >>>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>>> not getting >>>> > > > > > distributed >>>> > > > > > > >>>>>> correctly - or something like that! >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that >>>> tell me to >>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. >>>> This is a >>>> > > > > > 'Stateful >>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>>> 'registering' them >>>> > > > in >>>> > > > > > > >>>>>> SparkContext. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> > >>>> > - >>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> > >>>> >>>
Re: Using Spark Accumulators with Structured Streaming
gt; updateAcrossEvents >>> > > > > > > > ) >>> > > > > > > > >>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >>> We're >>> > > > > > experiencing this only under 'Stateful Structured Streaming'. >>> In other >>> > > > > > streaming applications it works as expected. >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>> srini@gmail.com> >>> > > > > > wrote: >>> > > > > > > > >>> > > > > > > >> Yes, I am talking about Application specific Accumulators. >>> > > > Actually I >>> > > > > > am >>> > > > > > > >> getting the values printed in my driver log as well as >>> sent to >>> > > > > > Grafana. Not >>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>> “client” on >>> > > > a >>> > > > > > yarn >>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>> It should >>> > > > > > work the >>> > > > > > > >> same for cluster mode as well. >>> > > > > > > >> Create accumulators like this: >>> > > > > > > >> AccumulatorV2 accumulator = >>> sparkContext.longAccumulator(name); >>> > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>> > > > > > > >> mailinglist...@gmail.com> wrote: >>> > > > > > > >> >>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>> getting >>> > > > > > computed in >>> > > > > > > >>> your code? I am talking about the Application Specific >>> > > > Accumulators. >>> > > > > > The >>> > > > > > > >>> other standard counters such as >>> > > > 'event.progress.inputRowsPerSecond' >>> > > > > > are >>> > > > > > > >>> getting populated correctly! >>> > > > > > > >>> >>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>> srini@gmail.com> >>> > > > > > wrote: >>> > > > > > > >>> >>> > > > > > > >>>> Hello, >>> > > > > > > >>>> Even for me it comes as 0 when I print in >>> OnQueryProgress. I use >>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but >>> not on >>> > > > > > cluster. >>> > > > > > > >>>> But one consolation is that when I send metrics to >>> Graphana, the >>> > > > > > values >>> > > > > > > >>>> are coming there. >>> > > > > > > >>>> >>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>> > > > > > > >>>> >>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. >>> > > > > > > >>>>> >>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>> wezh...@outlook.com >>> > > > > >>> > > > > > wrote: >>> > > > > > > >>>>> >>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the >>> OUT type >>> > > > > > should >>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>> implementation >>> > > > for >>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there >>> any >>> > > > chance >>> > > > > > to replace >>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] >>> or >>> > > > > > LongAccumulator[3] >>> > > > > > > >>>>>> and test if the StreamingListener and other codes are >>> able to >>> > > > > > work? >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> --- >>> > > > > > > >>>>>> Cheers, >>> > > > > > > >>>>>> -z >>> > > > > > > >>>>>> [1] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0 >>> > > > > > > >>>>>> [2] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0 >>> > > > > > > >>>>>> [3] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0 >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> From: Something Something >>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>> > > > > > > >>>>>> To: spark-user >>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured >>> > > > Streaming >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>> this >>> > > > > > functionality >>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on >>> this but >>> > > > > > can't get it >>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>> Accumulator >>> > > > > > class that >>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>> track of >>> > > > one or >>> > > > > > more >>> > > > > > > >>>>>> accumulators. Here's the definition: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> When the job begins we register an instance of this >>> class: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>> "MyAccumulator") >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Is this working under Structured Streaming? >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> I will keep looking for alternate approaches but any >>> help >>> > > > would be >>> > > > > > > >>>>>> greatly appreciated. Thanks. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>> > > > > > > >>>>>> mailinglist...@gmail.com>> mailinglist...@gmail.com>> >>> > > > wrote: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>> > > > Accumulators in >>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 >>> when I >>> > > > try to >>> > > > > > print >>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> > > > > > > >>>>>> updateAcrossEvents >>> > > > > > > >>>>>> ) >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> The accumulators get incremented in >>> 'updateAcrossEvents'. >>> > > > I've a >>> > > > > > > >>>>>> StreamingListener which writes values of the >>> accumulators in >>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>> Accumulators >>> > > > are >>> > > > > > ALWAYS >>> > > > > > > >>>>>> ZERO! >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> When I added log statements in the >>> updateAcrossEvents, I >>> > > > could see >>> > > > > > > >>>>>> that these accumulators are getting incremented as >>> expected. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. >>> In Local >>> > > > mode >>> > > > > > it >>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>> not getting >>> > > > > > distributed >>> > > > > > > >>>>>> correctly - or something like that! >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that >>> tell me to >>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This >>> is a >>> > > > > > 'Stateful >>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>> 'registering' them >>> > > > in >>> > > > > > > >>>>>> SparkContext. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> > >>> > - >>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> > >>> >>
Re: Using Spark Accumulators with Structured Streaming
t; > > > > > AFAICT, the accumulator is incremented well. There is a way to >> verify >> > > > that >> > > > > > in cluster like this: >> > > > > > ``` >> > > > > > // Add the following while loop before invoking >> awaitTermination >> > > > > > while (true) { >> > > > > > println("My acc: " + myAcc.value) >> > > > > > Thread.sleep(5 * 1000) >> > > > > > } >> > > > > > >> > > > > > //query.awaitTermination() >> > > > > > ``` >> > > > > > >> > > > > > And the accumulator value updated can be found from driver >> stdout. >> > > > > > >> > > > > > -- >> > > > > > Cheers, >> > > > > > -z >> > > > > > >> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >> > > > > > Srinivas V wrote: >> > > > > > >> > > > > > > yes, I am using stateful structured streaming. Yes similar to >> what >> > > > you >> > > > > > do. >> > > > > > > This is in Java >> > > > > > > I do it this way: >> > > > > > > Dataset productUpdates = watermarkedDS >> > > > > > > .groupByKey( >> > > > > > > (MapFunction> String>) event >> > > > -> >> > > > > > > event.getId(), Encoders.STRING()) >> > > > > > > .mapGroupsWithState( >> > > > > > > new >> > > > > > > >> > > > > > >> > > > >> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >> > > > > > > appConfig, accumulators), >> > > > > > > Encoders.bean(ModelStateInfo.class), >> > > > > > > Encoders.bean(ModelUpdate.class), >> > > > > > > >> GroupStateTimeout.ProcessingTimeTimeout()); >> > > > > > > >> > > > > > > StateUpdateTask contains the update method. >> > > > > > > >> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >> > > > > > > mailinglist...@gmail.com> wrote: >> > > > > > > >> > > > > > > > Yes, that's exactly how I am creating them. >> > > > > > > > >> > > > > > > > Question... Are you using 'Stateful Structured Streaming' >> in which >> > > > > > you've >> > > > > > > > something like this? >> > > > > > > > >> > > > > > > > >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> > > > > > > > updateAcrossEvents >> > > > > > > > ) >> > > > > > > > >> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >> We're >> > > > > > experiencing this only under 'Stateful Structured Streaming'. >> In other >> > > > > > streaming applications it works as expected. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >> srini@gmail.com> >> > > > > > wrote: >> > > > > > > > >> > > > > > > >> Yes, I am talking about Application specific Accumulators. >> > > > Actually I >> > > > > > am >> > > > > > > >> getting the values printed in my driver log as well as >> sent to >> > > > > > Grafana. Not >> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >> “client” on >> > > > a >> > > > > > yarn >> > > > > > > >> cluster(not local Mac) where I submit from master node. It >> should >> > > > > > work the >> > > > > > > >> same for cluster
Re: Using Spark Accumulators with Structured Streaming
new > > > > > > > > > > > > > > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > > > > appConfig, accumulators), > > > > > > > Encoders.bean(ModelStateInfo.class), > > > > > > > Encoders.bean(ModelUpdate.class), > > > > > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > > > > > > > StateUpdateTask contains the update method. > > > > > > > > > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < > > > > > > > mailinglist...@gmail.com> wrote: > > > > > > > > > > > > > > > Yes, that's exactly how I am creating them. > > > > > > > > > > > > > > > > Question... Are you using 'Stateful Structured Streaming' in > which > > > > > > you've > > > > > > > > something like this? > > > > > > > > > > > > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > > > > updateAcrossEvents > > > > > > > > ) > > > > > > > > > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? > We're > > > > > > experiencing this only under 'Stateful Structured Streaming'. In > other > > > > > > streaming applications it works as expected. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < > srini@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > >> Yes, I am talking about Application specific Accumulators. > > > > Actually I > > > > > > am > > > > > > > >> getting the values printed in my driver log as well as sent > to > > > > > > Grafana. Not > > > > > > > >> sure where and when I saw 0 before. My deploy mode is > “client” on > > > > a > > > > > > yarn > > > > > > > >> cluster(not local Mac) where I submit from master node. It > should > > > > > > work the > > > > > > > >> same for cluster mode as well. > > > > > > > >> Create accumulators like this: > > > > > > > >> AccumulatorV2 accumulator = > sparkContext.longAccumulator(name); > > > > > > > >> > > > > > > > >> > > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > > > > > >> mailinglist...@gmail.com> wrote: > > > > > > > >> > > > > > > > >>> Hmm... how would they go to Graphana if they are not > getting > > > > > > computed in > > > > > > > >>> your code? I am talking about the Application Specific > > > > Accumulators. > > > > > > The > > > > > > > >>> other standard counters such as > > > > 'event.progress.inputRowsPerSecond' > > > > > > are > > > > > > > >>> getting populated correctly! > > > > > > > >>> > > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < > srini@gmail.com> > > > > > > wrote: > > > > > > > >>> > > > > > > > >>>> Hello, > > > > > > > >>>> Even for me it comes as 0 when I print in > OnQueryProgress. I use > > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but > not on > > > > > > cluster. > > > > > > > >>>> But one consolation is that when I send metrics to > Graphana, the > > > > > > values > > > > > > > >>>> are coming there. > > > > > > > >>>> > > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > > > > >>>> mailinglist...@gmail.com> wrote: > > > > >
Re: Using Spark Accumulators with Structured Streaming
Events > > > > > > > ) > > > > > > > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're > > > > > experiencing this only under 'Stateful Structured Streaming'. In other > > > > > streaming applications it works as expected. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V > > > > > wrote: > > > > > > > > > > > > > >> Yes, I am talking about Application specific Accumulators. > > > Actually I > > > > > am > > > > > > >> getting the values printed in my driver log as well as sent to > > > > > Grafana. Not > > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on > > > a > > > > > yarn > > > > > > >> cluster(not local Mac) where I submit from master node. It should > > > > > work the > > > > > > >> same for cluster mode as well. > > > > > > >> Create accumulators like this: > > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > > > > >> > > > > > > >> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > > > > >> mailinglist...@gmail.com> wrote: > > > > > > >> > > > > > > >>> Hmm... how would they go to Graphana if they are not getting > > > > > computed in > > > > > > >>> your code? I am talking about the Application Specific > > > Accumulators. > > > > > The > > > > > > >>> other standard counters such as > > > 'event.progress.inputRowsPerSecond' > > > > > are > > > > > > >>> getting populated correctly! > > > > > > >>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V > > > > > wrote: > > > > > > >>> > > > > > > >>>> Hello, > > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I > > > > > > >>>> use > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > > > > cluster. > > > > > > >>>> But one consolation is that when I send metrics to Graphana, > > > > > > >>>> the > > > > > values > > > > > > >>>> are coming there. > > > > > > >>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > > > >>>> mailinglist...@gmail.com> wrote: > > > > > > >>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > > > >>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > > > > > > > > wrote: > > > > > > >>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type > > > > > should > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation > > > for > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any > > > chance > > > > > to replace > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > > > > > LongAccumulator[3] > > > > > > >>>>>> and test if the StreamingListener and other codes are able to > > > > > work? > > > > > > >>>>>> > > > > > > >>>>>> --- > > > > > > >>>>>> Cheers, > > > > > > >>>>>> -z > > > > > > >>>>>> [1] > > > > > > >>>>>> > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7
Re: Using Spark Accumulators with Structured Streaming
M Srinivas V > > > > wrote: > > > > > >>> > > > > > >>>> Hello, > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > > > cluster. > > > > > >>>> But one consolation is that when I send metrics to Graphana, the > > > > values > > > > > >>>> are coming there. > > > > > >>>> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > > >>>> mailinglist...@gmail.com> wrote: > > > > > >>>> > > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > > >>>>> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > > > > > > wrote: > > > > > >>>>> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type > > > > should > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation > > for > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any > > chance > > > > to replace > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > > > > LongAccumulator[3] > > > > > >>>>>> and test if the StreamingListener and other codes are able to > > > > work? > > > > > >>>>>> > > > > > >>>>>> --- > > > > > >>>>>> Cheers, > > > > > >>>>>> -z > > > > > >>>>>> [1] > > > > > >>>>>> > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0 > > > > > >>>>>> [2] > > > > > >>>>>> > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0 > > > > > >>>>>> [3] > > > > > >>>>>> > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0 > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> From: Something Something > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 > > > > > >>>>>> To: spark-user > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured > > Streaming > > > > > >>>>>> > > > > > >>>>>> Can someone from Spark Development team tell me if this > > > > functionality > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but > > > > can't get it > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator > > > > class that > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of > > one or > > > > more > > > > > >>>>>> accumulators. Here's the definition: > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> class CollectionLongAccumulator[T] > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] > > > > > >>>>>> > > > > > >>>>>> When the job begins we register an instance of this class: > > > > > >>>>>> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") > > > > > >>>>>> > > > > > >>>>>> Is this working under Structured Streaming? > > > > > >>>>>> > > > > > >>>>>> I will keep looking for alternate approaches but any help > > would be > > > > > >>>>>> greatly appreciated. Thanks. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < > > > > > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> > > wrote: > > > > > >>>>>> > > > > > >>>>>> In my structured streaming job I am updating Spark > > Accumulators in > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I > > try to > > > > print > > > > > >>>>>> them in my StreamingListener. Here's the code: > > > > > >>>>>> > > > > > >>>>>> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > >>>>>> updateAcrossEvents > > > > > >>>>>> ) > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. > > I've a > > > > > >>>>>> StreamingListener which writes values of the accumulators in > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators > > are > > > > ALWAYS > > > > > >>>>>> ZERO! > > > > > >>>>>> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I > > could see > > > > > >>>>>> that these accumulators are getting incremented as expected. > > > > > >>>>>> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local > > mode > > > > it > > > > > >>>>>> works fine which implies that the Accumulators are not getting > > > > distributed > > > > > >>>>>> correctly - or something like that! > > > > > >>>>>> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to > > > > > >>>>>> perform an "Action". That's not a solution here. This is a > > > > 'Stateful > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them > > in > > > > > >>>>>> SparkContext. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Using Spark Accumulators with Structured Streaming
On Wed, May 27, 2020 at 9:01 AM Srinivas V >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, I am talking about Application specific Accumulators. Actually >>>>>>>> I am getting the values printed in my driver log as well as sent to >>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is >>>>>>>> “client” >>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It >>>>>>>> should >>>>>>>> work the same for cluster mode as well. >>>>>>>> Create accumulators like this: >>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hmm... how would they go to Graphana if they are not getting >>>>>>>>> computed in your code? I am talking about the Application Specific >>>>>>>>> Accumulators. The other standard counters such as >>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly! >>>>>>>>> >>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on >>>>>>>>>> cluster. >>>>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>>>> values are coming there. >>>>>>>>>> >>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the >>>>>>>>>>>> implementation for >>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance >>>>>>>>>>>> to replace >>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>>>>> LongAccumulator[3] >>>>>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>>>>> >>>>>>>>>>>> --- >>>>>>>>>>>> Cheers, >>>>>>>>>>>> -z >>>>>>>>>>>> [1] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>>>>> [2] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>>>>> [3] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> From: Something Something >>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>>>>> To: spark-user >>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>>>>> >>>>>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>>>>> this but >>>>>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>>>>> Accumulator >>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track >>>>>>>>>>>> of one >>>>>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>>>>> >>>>>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>>>>> >>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>>>>> >>>>>>>>>>>> Is this working under Structured Streaming? >>>>>>>>>>>> >>>>>>>>>>>> I will keep looking for alternate approaches but any help would >>>>>>>>>>>> be greatly appreciated. Thanks. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators >>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try >>>>>>>>>>>> to print >>>>>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>>>>> >>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>>>>> updateAcrossEvents >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've >>>>>>>>>>>> a StreamingListener which writes values of the accumulators in >>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>>>>> ALWAYS >>>>>>>>>>>> ZERO! >>>>>>>>>>>> >>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could >>>>>>>>>>>> see that these accumulators are getting incremented as expected. >>>>>>>>>>>> >>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local >>>>>>>>>>>> mode it works fine which implies that the Accumulators are not >>>>>>>>>>>> getting >>>>>>>>>>>> distributed correctly - or something like that! >>>>>>>>>>>> >>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>>>>> perform an "Action". That's not a solution here. This is a >>>>>>>>>>>> 'Stateful >>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>>>>> SparkContext. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>
Re: Using Spark Accumulators with Structured Streaming
gt;>>>>>> should >>>>>>> work the same for cluster mode as well. >>>>>>> Create accumulators like this: >>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>>>> >>>>>>> >>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> Hmm... how would they go to Graphana if they are not getting >>>>>>>> computed in your code? I am talking about the Application Specific >>>>>>>> Accumulators. The other standard counters such as >>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly! >>>>>>>> >>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on >>>>>>>>> cluster. >>>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>>> values are coming there. >>>>>>>>> >>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>>>>>> >>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>>>> should be atomic or thread safe. I'm wondering if the >>>>>>>>>>> implementation for >>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>>>>> replace >>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>>>> LongAccumulator[3] >>>>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>>>> >>>>>>>>>>> --- >>>>>>>>>>> Cheers, >>>>>>>>>>> -z >>>>>>>>>>> [1] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>>>> [2] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>>>> [3] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> From: Something Something >>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>>>> To: spark-user >>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>>>> >>>>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>>>> this but >>>>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>>>> Accumulator >>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track >>>>>>>>>>> of one >>>>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>>>> >>>>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>>>> >>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>>>> >>>>>>>>>>> Is this working under Structured Streaming? >>>>>>>>>>> >>>>>>>>>>> I will keep looking for alternate approaches but any help would >>>>>>>>>>> be greatly appreciated. Thanks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators >>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try >>>>>>>>>>> to print >>>>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>>>> >>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>>>> updateAcrossEvents >>>>>>>>>>> ) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>>>> ALWAYS >>>>>>>>>>> ZERO! >>>>>>>>>>> >>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could >>>>>>>>>>> see that these accumulators are getting incremented as expected. >>>>>>>>>>> >>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local >>>>>>>>>>> mode it works fine which implies that the Accumulators are not >>>>>>>>>>> getting >>>>>>>>>>> distributed correctly - or something like that! >>>>>>>>>>> >>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>>>> SparkContext. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>
Re: Using Spark Accumulators with Structured Streaming
t;>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>>>> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>> values are coming there. >>>>>>>> >>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>>> >>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>>>>> >>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>>>>> for >>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>>>> replace >>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>>> LongAccumulator[3] >>>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>>> >>>>>>>>>> --- >>>>>>>>>> Cheers, >>>>>>>>>> -z >>>>>>>>>> [1] >>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>>> [2] >>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>>> [3] >>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> From: Something Something >>>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>>> To: spark-user >>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>>> >>>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>>> this but >>>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>>> Accumulator >>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track >>>>>>>>>> of one >>>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>>> >>>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>>> >>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>>> >>>>>>>>>> Is this working under Structured Streaming? >>>>>>>>>> >>>>>>>>>> I will keep looking for alternate approaches but any help would >>>>>>>>>> be greatly appreciated. Thanks. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>>>>>> >>>>>>>>>> In my structured streaming job I am updating Spark Accumulators >>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to >>>>>>>>>> print >>>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>>> >>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>>> updateAcrossEvents >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>>> ALWAYS >>>>>>>>>> ZERO! >>>>>>>>>> >>>>>>>>>> When I added log statements in the updateAcrossEvents, I could >>>>>>>>>> see that these accumulators are getting incremented as expected. >>>>>>>>>> >>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode >>>>>>>>>> it works fine which implies that the Accumulators are not getting >>>>>>>>>> distributed correctly - or something like that! >>>>>>>>>> >>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>>> SparkContext. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>
Re: Using Spark Accumulators with Structured Streaming
>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>> >>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>>>> >>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>>>> for >>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>>> replace >>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>> LongAccumulator[3] >>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>> >>>>>>>>> --- >>>>>>>>> Cheers, >>>>>>>>> -z >>>>>>>>> [1] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>> [2] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>> [3] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>> >>>>>>>>> >>>>>>>>> From: Something Something >>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>> To: spark-user >>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>> >>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>> this but >>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>> Accumulator >>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of >>>>>>>>> one >>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>> >>>>>>>>> >>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>> >>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>> >>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>> >>>>>>>>> Is this working under Structured Streaming? >>>>>>>>> >>>>>>>>> I will keep looking for alternate approaches but any help would be >>>>>>>>> greatly appreciated. Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>>>>> >>>>>>>>> In my structured streaming job I am updating Spark Accumulators in >>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to >>>>>>>>> print >>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>> >>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>> updateAcrossEvents >>>>>>>>> ) >>>>>>>>> >>>>>>>>> >>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>> ALWAYS >>>>>>>>> ZERO! >>>>>>>>> >>>>>>>>> When I added log statements in the updateAcrossEvents, I could see >>>>>>>>> that these accumulators are getting incremented as expected. >>>>>>>>> >>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode >>>>>>>>> it works fine which implies that the Accumulators are not getting >>>>>>>>> distributed correctly - or something like that! >>>>>>>>> >>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>> SparkContext. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>
Re: Using Spark Accumulators with Structured Streaming
gt; > > > > And the accumulator value updated can be found from driver stdout. > > > > > > -- > > > Cheers, > > > -z > > > > > > On Thu, 28 May 2020 17:12:48 +0530 > > > Srinivas V wrote: > > > > > > > yes, I am using stateful structured streaming. Yes similar to what > you > > > do. > > > > This is in Java > > > > I do it this way: > > > > Dataset productUpdates = watermarkedDS > > > > .groupByKey( > > > > (MapFunction) event > -> > > > > event.getId(), Encoders.STRING()) > > > > .mapGroupsWithState( > > > > new > > > > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > appConfig, accumulators), > > > > Encoders.bean(ModelStateInfo.class), > > > > Encoders.bean(ModelUpdate.class), > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > StateUpdateTask contains the update method. > > > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < > > > > mailinglist...@gmail.com> wrote: > > > > > > > > > Yes, that's exactly how I am creating them. > > > > > > > > > > Question... Are you using 'Stateful Structured Streaming' in which > > > you've > > > > > something like this? > > > > > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > updateAcrossEvents > > > > > ) > > > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're > > > experiencing this only under 'Stateful Structured Streaming'. In other > > > streaming applications it works as expected. > > > > > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V > > > wrote: > > > > > > > > > >> Yes, I am talking about Application specific Accumulators. > Actually I > > > am > > > > >> getting the values printed in my driver log as well as sent to > > > Grafana. Not > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on > a > > > yarn > > > > >> cluster(not local Mac) where I submit from master node. It should > > > work the > > > > >> same for cluster mode as well. > > > > >> Create accumulators like this: > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > > >> > > > > >> > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > > >> mailinglist...@gmail.com> wrote: > > > > >> > > > > >>> Hmm... how would they go to Graphana if they are not getting > > > computed in > > > > >>> your code? I am talking about the Application Specific > Accumulators. > > > The > > > > >>> other standard counters such as > 'event.progress.inputRowsPerSecond' > > > are > > > > >>> getting populated correctly! > > > > >>> > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V > > > wrote: > > > > >>> > > > > >>>> Hello, > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > > cluster. > > > > >>>> But one consolation is that when I send metrics to Graphana, the > > > values > > > > >>>> are coming there. > > > > >>>> > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > >>>> mailinglist...@gmail.com> wrote: > > > > >>>> > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > >>>>> > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > > > > wrote: > > > > >>>>> > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
Re: Using Spark Accumulators with Structured Streaming
Yes it is application specific class. This is how java Spark Functions work. You can refer to this code in the documentation: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java public class StateUpdateTask implements MapGroupsWithStateFunction { @Override public ModelUpdate call(String productId, Iterator eventsIterator, GroupState state) { } } On Thu, May 28, 2020 at 10:59 PM Something Something < mailinglist...@gmail.com> wrote: > I am assuming StateUpdateTask is your application specific class. Does it > have 'updateState' method or something? I googled but couldn't find any > documentation about doing it this way. Can you please direct me to some > documentation. Thanks. > > On Thu, May 28, 2020 at 4:43 AM Srinivas V wrote: > >> yes, I am using stateful structured streaming. Yes similar to what you >> do. This is in Java >> I do it this way: >> Dataset productUpdates = watermarkedDS >> .groupByKey( >> (MapFunction) event -> >> event.getId(), Encoders.STRING()) >> .mapGroupsWithState( >> new >> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >> appConfig, accumulators), >> Encoders.bean(ModelStateInfo.class), >> Encoders.bean(ModelUpdate.class), >> GroupStateTimeout.ProcessingTimeTimeout()); >> >> StateUpdateTask contains the update method. >> >> On Thu, May 28, 2020 at 4:41 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Yes, that's exactly how I am creating them. >>> >>> Question... Are you using 'Stateful Structured Streaming' in which >>> you've something like this? >>> >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> >>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>> experiencing this only under 'Stateful Structured Streaming'. In other >>> streaming applications it works as expected. >>> >>> >>> >>> On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: >>> >>>> Yes, I am talking about Application specific Accumulators. Actually I >>>> am getting the values printed in my driver log as well as sent to Grafana. >>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a >>>> yarn cluster(not local Mac) where I submit from master node. It should work >>>> the same for cluster mode as well. >>>> Create accumulators like this: >>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>> >>>> >>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> Hmm... how would they go to Graphana if they are not getting computed >>>>> in your code? I am talking about the Application Specific Accumulators. >>>>> The >>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>>> getting populated correctly! >>>>> >>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>> values are coming there. >>>>>> >>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>> mailinglist...@gmail.com> wrote: >>>>>> >>>>>>> No this is not working even if I use LongAccumulator. >>>>>>> >>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>>> >>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>>> for >>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>> replace >>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>&
Re: Using Spark Accumulators with Structured Streaming
roupsWithState( > > > new > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > appConfig, accumulators), > > > Encoders.bean(ModelStateInfo.class), > > > Encoders.bean(ModelUpdate.class), > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > StateUpdateTask contains the update method. > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < > > > mailinglist...@gmail.com> wrote: > > > > > > > Yes, that's exactly how I am creating them. > > > > > > > > Question... Are you using 'Stateful Structured Streaming' in which > > you've > > > > something like this? > > > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > updateAcrossEvents > > > > ) > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're > > experiencing this only under 'Stateful Structured Streaming'. In other > > streaming applications it works as expected. > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V > > wrote: > > > > > > > >> Yes, I am talking about Application specific Accumulators. Actually I > > am > > > >> getting the values printed in my driver log as well as sent to > > Grafana. Not > > > >> sure where and when I saw 0 before. My deploy mode is “client” on a > > yarn > > > >> cluster(not local Mac) where I submit from master node. It should > > work the > > > >> same for cluster mode as well. > > > >> Create accumulators like this: > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > >> > > > >> > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > >> mailinglist...@gmail.com> wrote: > > > >> > > > >>> Hmm... how would they go to Graphana if they are not getting > > computed in > > > >>> your code? I am talking about the Application Specific Accumulators. > > The > > > >>> other standard counters such as 'event.progress.inputRowsPerSecond' > > are > > > >>> getting populated correctly! > > > >>> > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V > > wrote: > > > >>> > > > >>>> Hello, > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > cluster. > > > >>>> But one consolation is that when I send metrics to Graphana, the > > values > > > >>>> are coming there. > > > >>>> > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > >>>> mailinglist...@gmail.com> wrote: > > > >>>> > > > >>>>> No this is not working even if I use LongAccumulator. > > > >>>>> > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > > wrote: > > > >>>>> > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type > > should > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance > > to replace > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > > LongAccumulator[3] > > > >>>>>> and test if the StreamingListener and other codes are able to > > work? > > > >>>>>> > > > >>>>>> --- > > > >>>>>> Cheers, > > > >>>>>> -z > > > >>>>>> [1] > > > >>>>>> > > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435%7C1%7C0%7C637262729866357123sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3Dreserved=0 > > > >>>>>> [2] >
Re: Using Spark Accumulators with Structured Streaming
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks. On Thu, May 28, 2020 at 4:43 AM Srinivas V wrote: > yes, I am using stateful structured streaming. Yes similar to what you do. > This is in Java > I do it this way: > Dataset productUpdates = watermarkedDS > .groupByKey( > (MapFunction) event -> > event.getId(), Encoders.STRING()) > .mapGroupsWithState( > new > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > appConfig, accumulators), > Encoders.bean(ModelStateInfo.class), > Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Yes, that's exactly how I am creating them. >> >> Question... Are you using 'Stateful Structured Streaming' in which you've >> something like this? >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> >> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing >> this only under 'Stateful Structured Streaming'. In other streaming >> applications it works as expected. >> >> >> >> On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: >> >>> Yes, I am talking about Application specific Accumulators. Actually I am >>> getting the values printed in my driver log as well as sent to Grafana. Not >>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn >>> cluster(not local Mac) where I submit from master node. It should work the >>> same for cluster mode as well. >>> Create accumulators like this: >>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>> >>> >>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Hmm... how would they go to Graphana if they are not getting computed >>>> in your code? I am talking about the Application Specific Accumulators. The >>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>> getting populated correctly! >>>> >>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: >>>> >>>>> Hello, >>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>> But one consolation is that when I send metrics to Graphana, the >>>>> values are coming there. >>>>> >>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> No this is not working even if I use LongAccumulator. >>>>>> >>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>> >>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>> replace >>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>> LongAccumulator[3] >>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>> >>>>>>> --- >>>>>>> Cheers, >>>>>>> -z >>>>>>> [1] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>> [2] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>> [3] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>> >>>>>>> >>>>>>> From: Something Something >>>>>>> Sent: Saturday, May 16, 2020 0:38 >&g
Re: Using Spark Accumulators with Structured Streaming
> >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > >> mailinglist...@gmail.com> wrote: > > >> > > >>> Hmm... how would they go to Graphana if they are not getting > computed in > > >>> your code? I am talking about the Application Specific Accumulators. > The > > >>> other standard counters such as 'event.progress.inputRowsPerSecond' > are > > >>> getting populated correctly! > > >>> > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V > wrote: > > >>> > > >>>> Hello, > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > cluster. > > >>>> But one consolation is that when I send metrics to Graphana, the > values > > >>>> are coming there. > > >>>> > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > >>>> mailinglist...@gmail.com> wrote: > > >>>> > > >>>>> No this is not working even if I use LongAccumulator. > > >>>>> > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > wrote: > > >>>>> > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type > should > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance > to replace > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > LongAccumulator[3] > > >>>>>> and test if the StreamingListener and other codes are able to > work? > > >>>>>> > > >>>>>> --- > > >>>>>> Cheers, > > >>>>>> -z > > >>>>>> [1] > > >>>>>> > https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3Dreserved=0 > > >>>>>> [2] > > >>>>>> > https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3Dreserved=0 > > >>>>>> [3] > > >>>>>> > https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3Dreserved=0 > > >>>>>> > > >>>>>> > > >>>>>> From: Something Something > > >>>>>> Sent: Saturday, May 16, 2020 0:38 > > >>>>>> To: spark-user > > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming > > >>>>>> > > >>>>>> Can someone from Spark Development team tell me if this > functionality > > >>>>>> is supported and tested? I've spent a lot of time on this but > can't get it > > >>>>>> to work. Just to add more context, we've our own Accumulator > class that > > >>>>>> extends from AccumulatorV2. In this class we keep track of one or > more > > >>>>>> accumulators. Here's the definition: > > >>>>>> > > >>>>>> > > >>>>>> class CollectionLongAccumulator[T] > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] > > >>>>>> > > >>>>>> When the job begins we register an instance of this class: > > >>>>>> > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") > > >>>>>> > > >>>>>> Is this working under Structured Streaming? > > >>>>>> > > >>>>>> I will keep looking for alternate approaches but any help would be > > >>>>>> greatly appreciated. Thanks. > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < > > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > > >>>>>> > > >>>>>> In my structured streaming job I am updating Spark Accumulators in > > >>>>>> the updateAcrossEvents method but they are always 0 when I try to > print > > >>>>>> them in my StreamingListener. Here's the code: > > >>>>>> > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > >>>>>> updateAcrossEvents > > >>>>>> ) > > >>>>>> > > >>>>>> > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a > > >>>>>> StreamingListener which writes values of the accumulators in > > >>>>>> 'onQueryProgress' method but in this method the Accumulators are > ALWAYS > > >>>>>> ZERO! > > >>>>>> > > >>>>>> When I added log statements in the updateAcrossEvents, I could see > > >>>>>> that these accumulators are getting incremented as expected. > > >>>>>> > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode > it > > >>>>>> works fine which implies that the Accumulators are not getting > distributed > > >>>>>> correctly - or something like that! > > >>>>>> > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to > > >>>>>> perform an "Action". That's not a solution here. This is a > 'Stateful > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in > > >>>>>> SparkContext. > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> >
Re: Using Spark Accumulators with Structured Streaming
gt; >>>>>> [2] > >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3Dreserved=0 > >>>>>> [3] > >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3Dreserved=0 > >>>>>> > >>>>>> > >>>>>> From: Something Something > >>>>>> Sent: Saturday, May 16, 2020 0:38 > >>>>>> To: spark-user > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming > >>>>>> > >>>>>> Can someone from Spark Development team tell me if this functionality > >>>>>> is supported and tested? I've spent a lot of time on this but can't > >>>>>> get it > >>>>>> to work. Just to add more context, we've our own Accumulator class that > >>>>>> extends from AccumulatorV2. In this class we keep track of one or more > >>>>>> accumulators. Here's the definition: > >>>>>> > >>>>>> > >>>>>> class CollectionLongAccumulator[T] > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] > >>>>>> > >>>>>> When the job begins we register an instance of this class: > >>>>>> > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") > >>>>>> > >>>>>> Is this working under Structured Streaming? > >>>>>> > >>>>>> I will keep looking for alternate approaches but any help would be > >>>>>> greatly appreciated. Thanks. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > >>>>>> > >>>>>> In my structured streaming job I am updating Spark Accumulators in > >>>>>> the updateAcrossEvents method but they are always 0 when I try to print > >>>>>> them in my StreamingListener. Here's the code: > >>>>>> > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > >>>>>> updateAcrossEvents > >>>>>> ) > >>>>>> > >>>>>> > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a > >>>>>> StreamingListener which writes values of the accumulators in > >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS > >>>>>> ZERO! > >>>>>> > >>>>>> When I added log statements in the updateAcrossEvents, I could see > >>>>>> that these accumulators are getting incremented as expected. > >>>>>> > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it > >>>>>> works fine which implies that the Accumulators are not getting > >>>>>> distributed > >>>>>> correctly - or something like that! > >>>>>> > >>>>>> Note: I've seen quite a few answers on the Web that tell me to > >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in > >>>>>> SparkContext. > >>>>>> > >>>>>> > >>>>>> > >>>>>> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Using Spark Accumulators with Structured Streaming
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java I do it this way: Dataset productUpdates = watermarkedDS .groupByKey( (MapFunction) event -> event.getId(), Encoders.STRING()) .mapGroupsWithState( new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators), Encoders.bean(ModelStateInfo.class), Encoders.bean(ModelUpdate.class), GroupStateTimeout.ProcessingTimeTimeout()); StateUpdateTask contains the update method. On Thu, May 28, 2020 at 4:41 AM Something Something < mailinglist...@gmail.com> wrote: > Yes, that's exactly how I am creating them. > > Question... Are you using 'Stateful Structured Streaming' in which you've > something like this? > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing > this only under 'Stateful Structured Streaming'. In other streaming > applications it works as expected. > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: > >> Yes, I am talking about Application specific Accumulators. Actually I am >> getting the values printed in my driver log as well as sent to Grafana. Not >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn >> cluster(not local Mac) where I submit from master node. It should work the >> same for cluster mode as well. >> Create accumulators like this: >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >> >> >> On Tue, May 26, 2020 at 8:42 PM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Hmm... how would they go to Graphana if they are not getting computed in >>> your code? I am talking about the Application Specific Accumulators. The >>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>> getting populated correctly! >>> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: >>> >>>> Hello, >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>> But one consolation is that when I send metrics to Graphana, the values >>>> are coming there. >>>> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> No this is not working even if I use LongAccumulator. >>>>> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>> replace >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>> LongAccumulator[3] >>>>>> and test if the StreamingListener and other codes are able to work? >>>>>> >>>>>> --- >>>>>> Cheers, >>>>>> -z >>>>>> [1] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>> [2] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>> [3] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>> >>>>>> >>>>>> From: Something Something >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>> To: spark-user >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>> >>>>>> Can someone from Spark Development team tell me if this functionality >>>>>> is supported and tested? I've spent a lot of time on this but can't get >>>>>> it >>>>>> to work. Just to add more context, we've our own Accumulator class that >>>>>> extends from AccumulatorV2. In this class we keep track of one or more >>>>>> accumulators. Here's the definition: >
Re: Using Spark Accumulators with Structured Streaming
Yes, that's exactly how I am creating them. Question... Are you using 'Stateful Structured Streaming' in which you've something like this? .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected. On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: > Yes, I am talking about Application specific Accumulators. Actually I am > getting the values printed in my driver log as well as sent to Grafana. Not > sure where and when I saw 0 before. My deploy mode is “client” on a yarn > cluster(not local Mac) where I submit from master node. It should work the > same for cluster mode as well. > Create accumulators like this: > AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > On Tue, May 26, 2020 at 8:42 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Hmm... how would they go to Graphana if they are not getting computed in >> your code? I am talking about the Application Specific Accumulators. The >> other standard counters such as 'event.progress.inputRowsPerSecond' are >> getting populated correctly! >> >> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: >> >>> Hello, >>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>> But one consolation is that when I send metrics to Graphana, the values >>> are coming there. >>> >>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> No this is not working even if I use LongAccumulator. >>>> >>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>> >>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>> replace >>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>> LongAccumulator[3] >>>>> and test if the StreamingListener and other codes are able to work? >>>>> >>>>> --- >>>>> Cheers, >>>>> -z >>>>> [1] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>> [2] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>> [3] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>> >>>>> >>>>> From: Something Something >>>>> Sent: Saturday, May 16, 2020 0:38 >>>>> To: spark-user >>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>> >>>>> Can someone from Spark Development team tell me if this functionality >>>>> is supported and tested? I've spent a lot of time on this but can't get it >>>>> to work. Just to add more context, we've our own Accumulator class that >>>>> extends from AccumulatorV2. In this class we keep track of one or more >>>>> accumulators. Here's the definition: >>>>> >>>>> >>>>> class CollectionLongAccumulator[T] >>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>> >>>>> When the job begins we register an instance of this class: >>>>> >>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>> >>>>> Is this working under Structured Streaming? >>>>> >>>>> I will keep looking for alternate approaches but any help would be >>>>> greatly appreciated. Thanks. >>>>> >>>>> >>>>> >>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>> >>>>> In my structured streaming job I am updating Spark Accumulators in the >>>>> updateAcrossEvents method but they are always 0 when I try to print them >>>>> in >>>>> my StreamingListener. Here's the code: >>>>> >>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> updateAcrossEvents >>>>> ) >>>>> >>>>> >>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>> StreamingListener which writes values of the accumulators in >>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>> ZERO! >>>>> >>>>> When I added log statements in the updateAcrossEvents, I could see >>>>> that these accumulators are getting incremented as expected. >>>>> >>>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>>> works fine which implies that the Accumulators are not getting distributed >>>>> correctly - or something like that! >>>>> >>>>> Note: I've seen quite a few answers on the Web that tell me to perform >>>>> an "Action". That's not a solution here. This is a 'Stateful Structured >>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>>>> >>>>> >>>>> >>>>>
Re: Using Spark Accumulators with Structured Streaming
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well. Create accumulators like this: AccumulatorV2 accumulator = sparkContext.longAccumulator(name); On Tue, May 26, 2020 at 8:42 PM Something Something < mailinglist...@gmail.com> wrote: > Hmm... how would they go to Graphana if they are not getting computed in > your code? I am talking about the Application Specific Accumulators. The > other standard counters such as 'event.progress.inputRowsPerSecond' are > getting populated correctly! > > On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: > >> Hello, >> Even for me it comes as 0 when I print in OnQueryProgress. I use >> LongAccumulator as well. Yes, it prints on my local but not on cluster. >> But one consolation is that when I send metrics to Graphana, the values >> are coming there. >> >> On Tue, May 26, 2020 at 3:10 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> No this is not working even if I use LongAccumulator. >>> >>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>> >>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be >>>> atomic or thread safe. I'm wondering if the implementation for >>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >>>> and test if the StreamingListener and other codes are able to work? >>>> >>>> --- >>>> Cheers, >>>> -z >>>> [1] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>> [2] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>> [3] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>> >>>> >>>> From: Something Something >>>> Sent: Saturday, May 16, 2020 0:38 >>>> To: spark-user >>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>> >>>> Can someone from Spark Development team tell me if this functionality >>>> is supported and tested? I've spent a lot of time on this but can't get it >>>> to work. Just to add more context, we've our own Accumulator class that >>>> extends from AccumulatorV2. In this class we keep track of one or more >>>> accumulators. Here's the definition: >>>> >>>> >>>> class CollectionLongAccumulator[T] >>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>> >>>> When the job begins we register an instance of this class: >>>> >>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>> >>>> Is this working under Structured Streaming? >>>> >>>> I will keep looking for alternate approaches but any help would be >>>> greatly appreciated. Thanks. >>>> >>>> >>>> >>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>> >>>> In my structured streaming job I am updating Spark Accumulators in the >>>> updateAcrossEvents method but they are always 0 when I try to print them in >>>> my StreamingListener. Here's the code: >>>> >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> updateAcrossEvents >>>> ) >>>> >>>> >>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>> StreamingListener which writes values of the accumulators in >>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>> ZERO! >>>> >>>> When I added log statements in the updateAcrossEvents, I could see that >>>> these accumulators are getting incremented as expected. >>>> >>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>> works fine which implies that the Accumulators are not getting distributed >>>> correctly - or something like that! >>>> >>>> Note: I've seen quite a few answers on the Web that tell me to perform >>>> an "Action". That's not a solution here. This is a 'Stateful Structured >>>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>>> >>>> >>>> >>>>
Re: Using Spark Accumulators with Structured Streaming
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly! On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: > Hello, > Even for me it comes as 0 when I print in OnQueryProgress. I use > LongAccumulator as well. Yes, it prints on my local but not on cluster. > But one consolation is that when I send metrics to Graphana, the values > are coming there. > > On Tue, May 26, 2020 at 3:10 AM Something Something < > mailinglist...@gmail.com> wrote: > >> No this is not working even if I use LongAccumulator. >> >> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >> >>> There is a restriction in AccumulatorV2 API [1], the OUT type should be >>> atomic or thread safe. I'm wondering if the implementation for >>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >>> and test if the StreamingListener and other codes are able to work? >>> >>> --- >>> Cheers, >>> -z >>> [1] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>> [2] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>> [3] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>> >>> >>> From: Something Something >>> Sent: Saturday, May 16, 2020 0:38 >>> To: spark-user >>> Subject: Re: Using Spark Accumulators with Structured Streaming >>> >>> Can someone from Spark Development team tell me if this functionality is >>> supported and tested? I've spent a lot of time on this but can't get it to >>> work. Just to add more context, we've our own Accumulator class that >>> extends from AccumulatorV2. In this class we keep track of one or more >>> accumulators. Here's the definition: >>> >>> >>> class CollectionLongAccumulator[T] >>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>> >>> When the job begins we register an instance of this class: >>> >>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>> >>> Is this working under Structured Streaming? >>> >>> I will keep looking for alternate approaches but any help would be >>> greatly appreciated. Thanks. >>> >>> >>> >>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>> >>> In my structured streaming job I am updating Spark Accumulators in the >>> updateAcrossEvents method but they are always 0 when I try to print them in >>> my StreamingListener. Here's the code: >>> >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> >>> >>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>> StreamingListener which writes values of the accumulators in >>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>> ZERO! >>> >>> When I added log statements in the updateAcrossEvents, I could see that >>> these accumulators are getting incremented as expected. >>> >>> This only happens when I run in the 'Cluster' mode. In Local mode it >>> works fine which implies that the Accumulators are not getting distributed >>> correctly - or something like that! >>> >>> Note: I've seen quite a few answers on the Web that tell me to perform >>> an "Action". That's not a solution here. This is a 'Stateful Structured >>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>> >>> >>> >>>
Re: Using Spark Accumulators with Structured Streaming
Hello, Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster. But one consolation is that when I send metrics to Graphana, the values are coming there. On Tue, May 26, 2020 at 3:10 AM Something Something < mailinglist...@gmail.com> wrote: > No this is not working even if I use LongAccumulator. > > On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: > >> There is a restriction in AccumulatorV2 API [1], the OUT type should be >> atomic or thread safe. I'm wondering if the implementation for >> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >> and test if the StreamingListener and other codes are able to work? >> >> --- >> Cheers, >> -z >> [1] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >> [2] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >> [3] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >> >> ________ >> From: Something Something >> Sent: Saturday, May 16, 2020 0:38 >> To: spark-user >> Subject: Re: Using Spark Accumulators with Structured Streaming >> >> Can someone from Spark Development team tell me if this functionality is >> supported and tested? I've spent a lot of time on this but can't get it to >> work. Just to add more context, we've our own Accumulator class that >> extends from AccumulatorV2. In this class we keep track of one or more >> accumulators. Here's the definition: >> >> >> class CollectionLongAccumulator[T] >> extends AccumulatorV2[T, java.util.Map[T, Long]] >> >> When the job begins we register an instance of this class: >> >> spark.sparkContext.register(myAccumulator, "MyAccumulator") >> >> Is this working under Structured Streaming? >> >> I will keep looking for alternate approaches but any help would be >> greatly appreciated. Thanks. >> >> >> >> On Thu, May 14, 2020 at 2:36 PM Something Something < >> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >> >> In my structured streaming job I am updating Spark Accumulators in the >> updateAcrossEvents method but they are always 0 when I try to print them in >> my StreamingListener. Here's the code: >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> >> >> The accumulators get incremented in 'updateAcrossEvents'. I've a >> StreamingListener which writes values of the accumulators in >> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >> ZERO! >> >> When I added log statements in the updateAcrossEvents, I could see that >> these accumulators are getting incremented as expected. >> >> This only happens when I run in the 'Cluster' mode. In Local mode it >> works fine which implies that the Accumulators are not getting distributed >> correctly - or something like that! >> >> Note: I've seen quite a few answers on the Web that tell me to perform an >> "Action". That's not a solution here. This is a 'Stateful Structured >> Streaming' job. Yes, I am also 'registering' them in SparkContext. >> >> >> >>
Re: Using Spark Accumulators with Structured Streaming
No this is not working even if I use LongAccumulator. On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: > There is a restriction in AccumulatorV2 API [1], the OUT type should be > atomic or thread safe. I'm wondering if the implementation for > `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace > CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] > and test if the StreamingListener and other codes are able to work? > > --- > Cheers, > -z > [1] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 > [2] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator > [3] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator > > > From: Something Something > Sent: Saturday, May 16, 2020 0:38 > To: spark-user > Subject: Re: Using Spark Accumulators with Structured Streaming > > Can someone from Spark Development team tell me if this functionality is > supported and tested? I've spent a lot of time on this but can't get it to > work. Just to add more context, we've our own Accumulator class that > extends from AccumulatorV2. In this class we keep track of one or more > accumulators. Here's the definition: > > > class CollectionLongAccumulator[T] > extends AccumulatorV2[T, java.util.Map[T, Long]] > > When the job begins we register an instance of this class: > > spark.sparkContext.register(myAccumulator, "MyAccumulator") > > Is this working under Structured Streaming? > > I will keep looking for alternate approaches but any help would be greatly > appreciated. Thanks. > > > > On Thu, May 14, 2020 at 2:36 PM Something Something < > mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >
Re: Using Spark Accumulators with Structured Streaming
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work? --- Cheers, -z [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 [2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator [3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator From: Something Something Sent: Saturday, May 16, 2020 0:38 To: spark-user Subject: Re: Using Spark Accumulators with Structured Streaming Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something mailto:mailinglist...@gmail.com>> wrote: In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO! When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Using Spark Accumulators with Structured Streaming
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something < mailinglist...@gmail.com> wrote: > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >
Using Spark Accumulators with Structured Streaming
In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO! When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.