(MapFunction>>>>> String>) event
>>>>>> > > > ->
>>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>>> > > > > > > .mapGroupsWithState(
>>
;>>> > > > > > > appConfig, accumulators),
>>>>> > > > > > >
>>>>> Encoders.bean(ModelStateInfo.class),
>>>>> > > > > > > Encoders.bean(ModelUpdate.class),
>>>>>
>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>> > > > > > > >
>>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>>&g
gTimeTimeout())(
>>> > > > > > > > updateAcrossEvents
>>> > > > > > > > )
>>> > > > > > > >
>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>> We
pecific Accumulators.
>> > > > Actually I
>> > > > > > am
>> > > > > > > >> getting the values printed in my driver log as well as
>> sent to
>> > > > > > Grafana. Not
>> > > > >
> >> same for cluster mode as well.
> > > > > > > >> Create accumulators like this:
> > > > > > > >> AccumulatorV2 accumulator =
> sparkContext.longAccumulator(name);
> > > > > > > >>
> > > > > > > >
gt; > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
>
> > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > cluster.
> > > > > >>>> But one consolation is
gt;>>>> computed in your code? I am talking about the Application Specific
>>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>
;>>>>>
>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> Even for me it comes as 0 when I print in OnQuery
cumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>> values are coming there.
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Somet
ay 26, 2020 at 3:10 AM Something Something <
>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote:
>>>>>
t; >>>> mailinglist...@gmail.com> wrote:
> > > > >>>>
> > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > >>>>>
> > > > >>>>> On Fri, May 15, 2020 at 9:
; On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote:
>>>>>>>
>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation
>>>>>>&
afe. I'm wondering if the implementation for
> > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> > to replace
> > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > LongAccumulator[3]
> &
`java.util.Map[T, Long]` can meet it or not. Is there any chance to
>>>>>>> replace
>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
>>>>>>> LongAccumulator[3]
>>>>>>> and test if the StreamingLi
gt; > >>>>>> [1]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7
.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378&sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&reserved=0
> >>>>&
to work?
>>>>>>
>>>>>> ---
>>>>>> Cheers,
>>>>>> -z
>>>>>> [1]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>> [2
>>>> [1]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>> [2]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>&g
index.html#org.apache.spark.util.AccumulatorV2
>>>> [2]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>> [3]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>
>>>> _
e.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>>
>>> From: Something Somethi
;> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> ________
>> From: Something Something
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streami
__
> From: Something Something
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a l
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
From: Something Something
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming
Can someone from Spark Develo
Can someone from Spark Development team tell me if this functionality is
supported and tested? I've spent a lot of time on this but can't get it
to work. Just to add more context, we've our own Accumulator class that
extends from AccumulatorV2. In this class we keep track of one or more
accumulator
In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
The accumula
27 matches
Mail list logo