I am assuming StateUpdateTask is your application specific class. Does it
have 'updateState' method or something? I googled but couldn't find any
documentation about doing it this way. Can you please direct me to some
documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which you've
>> something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>         updateAcrossEvents
>>       )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
>> this only under 'Stateful Structured Streaming'. In other streaming 
>> applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually I am
>>> getting the values printed in my driver log as well as sent to Grafana. Not
>>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>>> cluster(not local Mac) where I submit from master node. It should work the
>>> same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>> getting populated correctly!
>>>>
>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>> values are coming there.
>>>>>
>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>
>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>>
>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>> replace
>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>> LongAccumulator[3]
>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>
>>>>>>> ---
>>>>>>> Cheers,
>>>>>>> -z
>>>>>>> [1]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>> [2]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>> [3]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>
>>>>>>> ________________________________________
>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>> To: spark-user
>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>
>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>> functionality is supported and tested? I've spent a lot of time on this 
>>>>>>> but
>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>> Accumulator
>>>>>>> class that extends from AccumulatorV2. In this class we keep track of 
>>>>>>> one
>>>>>>> or more accumulators. Here's the definition:
>>>>>>>
>>>>>>>
>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>
>>>>>>> When the job begins we register an instance of this class:
>>>>>>>
>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>
>>>>>>> Is this working under Structured Streaming?
>>>>>>>
>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>> greatly appreciated. Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>
>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>
>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>         updateAcrossEvents
>>>>>>>       )
>>>>>>>
>>>>>>>
>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>> ZERO!
>>>>>>>
>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>
>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>>>> works fine which implies that the Accumulators are not getting 
>>>>>>> distributed
>>>>>>> correctly - or something like that!
>>>>>>>
>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>> SparkContext.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to