May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
    // Add the following while loop before invoking awaitTermination
    while (true) {
      println("My acc: " + myAcc.value)
      Thread.sleep(5 * 1000)
    }

    //query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

-- 
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V <srini....@gmail.com> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
> 
> StateUpdateTask contains the update method.
> 
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
> 
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >         updateAcrossEvents
> >       )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're 
> > experiencing this only under 'Stateful Structured Streaming'. In other 
> > streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> mailinglist...@gmail.com> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote:
> >>>
> >>>> Hello,
> >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> >>>> But one consolation is that when I send metrics to Graphana, the values
> >>>> are coming there.
> >>>>
> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> >>>> mailinglist...@gmail.com> wrote:
> >>>>
> >>>>> No this is not working even if I use LongAccumulator.
> >>>>>
> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezh...@outlook.com> wrote:
> >>>>>
> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
> >>>>>> replace
> >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
> >>>>>> LongAccumulator[3]
> >>>>>> and test if the StreamingListener and other codes are able to work?
> >>>>>>
> >>>>>> ---
> >>>>>> Cheers,
> >>>>>> -z
> >>>>>> [1]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> >>>>>> [2]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> >>>>>> [3]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> >>>>>>
> >>>>>> ________________________________________
> >>>>>> From: Something Something <mailinglist...@gmail.com>
> >>>>>> Sent: Saturday, May 16, 2020 0:38
> >>>>>> To: spark-user
> >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> >>>>>>
> >>>>>> Can someone from Spark Development team tell me if this functionality
> >>>>>> is supported and tested? I've spent a lot of time on this but can't 
> >>>>>> get it
> >>>>>> to work. Just to add more context, we've our own Accumulator class that
> >>>>>> extends from AccumulatorV2. In this class we keep track of one or more
> >>>>>> accumulators. Here's the definition:
> >>>>>>
> >>>>>>
> >>>>>> class CollectionLongAccumulator[T]
> >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> >>>>>>
> >>>>>> When the job begins we register an instance of this class:
> >>>>>>
> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> >>>>>>
> >>>>>> Is this working under Structured Streaming?
> >>>>>>
> >>>>>> I will keep looking for alternate approaches but any help would be
> >>>>>> greatly appreciated. Thanks.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
> >>>>>>
> >>>>>> In my structured streaming job I am updating Spark Accumulators in
> >>>>>> the updateAcrossEvents method but they are always 0 when I try to print
> >>>>>> them in my StreamingListener. Here's the code:
> >>>>>>
> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >>>>>>         updateAcrossEvents
> >>>>>>       )
> >>>>>>
> >>>>>>
> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> >>>>>> StreamingListener which writes values of the accumulators in
> >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> >>>>>> ZERO!
> >>>>>>
> >>>>>> When I added log statements in the updateAcrossEvents, I could see
> >>>>>> that these accumulators are getting incremented as expected.
> >>>>>>
> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
> >>>>>> works fine which implies that the Accumulators are not getting 
> >>>>>> distributed
> >>>>>> correctly - or something like that!
> >>>>>>
> >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
> >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> >>>>>> SparkContext.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to