Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <
>>>>>> wezh...@outlook.com> wrote:
>>>>>> > > > >
>>>>>> > > > > > May I get how the accumulator is accessed in the method
>>>>>> > > > > > `onQueryProgress()`?
>>>>>> > > > > >
>>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>>>> to verify
>>>>>> > > > that
>>>>>> > > > > > in cluster like this:
>>>>>> > > > > > ```
>>>>>> > > > > > // Add the following while loop before invoking
>>>>>> awaitTermination
>>>>>> > > > > > while (true) {
>>>>>> > > > > >   println("My acc: " + myAcc.value)
>>>>>> > > > > >   Thread.sleep(5 * 1000)
>>>>>> > > > > > }
>>>>>> > > > > >
>>>>>> > > > > > //query.awaitTermination()
>>>>>> > > > > > ```
>>>>>> > > > > >
>>>>>> > > > > > And the accumulator value updated can be found from driver
>>>>>> stdout.
>>>>>> > > > > >
>>>>>> > > > > > --
>>>>>> > > > > > Cheers,
>>>>>> > > > > > -z
>>>>>> > > > > >
>>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>>> > > > > > Srinivas V  wrote:
>>>>>> > > > > >
>>>>>> > > > > > > yes, I am using stateful structured streaming. Yes
>>>>>> similar to what
>>>>>> > > > you
>>>>>> > > > > > do.
>>>>>> > > > > > > This is in Java
>>>>>> > > > > > > I do it this way:
>>>>>> > > > > > > Dataset productUpdates = watermarkedDS
>>>>>> > > > > > > .groupByKey(
>>>>>> > > > > > > (MapFunction>>>>> String>) event
>>>>>> > > > ->
>>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>>> > > > > > > .mapGroupsWithState(
>>>>>> > > > > > > new
>>>>>> > > > > > >
>>>>>> > > > > >
>>>>>> > > >
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> > > > > > > appConfig, accumulators),
>>>>>> > > > > > >
>>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>>> > > > > > > Encoders.bean(ModelUpdate.class),
>>>>>> > > > > > >
>>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>>> > > > > > >
>>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>>> > > > > > >
>>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> > > > > > > mailinglist...@gmail.com> wrote:
>>>>>> > > > > > >
>>>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>>>> > > > > > > >
>>>>>> > > > > > > > Question... Are you using 'Stateful Structured
>>>>>> Streaming' in which
>>>>>> > > > > > you've
>>>>>> > > > > > > > something like this?
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> > > > > > > > updateAcrossEvents
>>>>>> > > > > > > >   )
>>>>&g

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
; > > Cheers,
>>>>> > > > > > -z
>>>>> > > > > >
>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>> > > > > > Srinivas V  wrote:
>>>>> > > > > >
>>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar
>>>>> to what
>>>>> > > > you
>>>>> > > > > > do.
>>>>> > > > > > > This is in Java
>>>>> > > > > > > I do it this way:
>>>>> > > > > > > Dataset productUpdates = watermarkedDS
>>>>> > > > > > > .groupByKey(
>>>>> > > > > > > (MapFunction>>>> String>) event
>>>>> > > > ->
>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>> > > > > > > .mapGroupsWithState(
>>>>> > > > > > > new
>>>>> > > > > > >
>>>>> > > > > >
>>>>> > > >
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> > > > > > > appConfig, accumulators),
>>>>> > > > > > >
>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>> > > > > > >     Encoders.bean(ModelUpdate.class),
>>>>> > > > > > >
>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>> > > > > > >
>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>> > > > > > >
>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>> > > > > > > mailinglist...@gmail.com> wrote:
>>>>> > > > > > >
>>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>>> > > > > > > >
>>>>> > > > > > > > Question... Are you using 'Stateful Structured
>>>>> Streaming' in which
>>>>> > > > > > you've
>>>>> > > > > > > > something like this?
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> > > > > > > > updateAcrossEvents
>>>>> > > > > > > >   )
>>>>> > > > > > > >
>>>>> > > > > > > > And updating the Accumulator inside
>>>>> 'updateAcrossEvents'? We're
>>>>> > > > > > experiencing this only under 'Stateful Structured
>>>>> Streaming'. In other
>>>>> > > > > > streaming applications it works as expected.
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>>> srini@gmail.com>
>>>>> > > > > > wrote:
>>>>> > > > > > > >
>>>>> > > > > > > >> Yes, I am talking about Application specific
>>>>> Accumulators.
>>>>> > > > Actually I
>>>>> > > > > > am
>>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>>> sent to
>>>>> > > > > > Grafana. Not
>>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>>> “client” on
>>>>> > > > a
>>>>> > > > > > yarn
>>>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>>>> It should
>>>>> > > > > > work the
>>>>> > > > > > > >> same for cluster mode as well.
>>>>> > > > > > > >> Create accumulators like this:
>>&g

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
parkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> > > > > > > appConfig, accumulators),
>>>> > > > > > > Encoders.bean(ModelStateInfo.class),
>>>> > > > > > > Encoders.bean(ModelUpdate.class),
>>>> > > > > > >
>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>> > > > > > >
>>>> > > > > > > StateUpdateTask contains the update method.
>>>> > > > > > >
>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> > > > > > > mailinglist...@gmail.com> wrote:
>>>> > > > > > >
>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>> > > > > > > >
>>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>>>> in which
>>>> > > > > > you've
>>>> > > > > > > > something like this?
>>>> > > > > > > >
>>>> > > > > > > >
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > > updateAcrossEvents
>>>> > > > > > > >   )
>>>> > > > > > > >
>>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>>> We're
>>>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>>>> In other
>>>> > > > > > streaming applications it works as expected.
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>> srini@gmail.com>
>>>> > > > > > wrote:
>>>> > > > > > > >
>>>> > > > > > > >> Yes, I am talking about Application specific
>>>> Accumulators.
>>>> > > > Actually I
>>>> > > > > > am
>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>> sent to
>>>> > > > > > Grafana. Not
>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>> “client” on
>>>> > > > a
>>>> > > > > > yarn
>>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>>> It should
>>>> > > > > > work the
>>>> > > > > > > >> same for cluster mode as well.
>>>> > > > > > > >> Create accumulators like this:
>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>> sparkContext.longAccumulator(name);
>>>> > > > > > > >>
>>>> > > > > > > >>
>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> > > > > > > >> mailinglist...@gmail.com> wrote:
>>>> > > > > > > >>
>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>> getting
>>>> > > > > > computed in
>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>> > > > Accumulators.
>>>> > > > > > The
>>>> > > > > > > >>> other standard counters such as
>>>> > > > 'event.progress.inputRowsPerSecond'
>>>> > > > > > are
>>>> > > > > > > >>> getting populated correctly!
>>>> > > > > > > >>>
>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>> srini@gmail.com>
>>>> > > > > > wrote:
>>>> > > > > > > >>>
>>>> > > > > > > >>>> Hello,
>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>> OnQueryProgress. I use
>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>> but not on
>>>> > > > > > cluster.
>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>> Graphana, the
>>>> > > > > > values
>>>> > > > > > > >>>> are coming there.
>>>> > > > > > > >>>>
>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> > > > > > > >>>> mailinglist...@gmail.com> wrote:
>>>> > > > > > > >>>>
>>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>> wezh...@outlook.com
>>>> > > > >
>>>> > > > > > wrote:
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>>>> OUT type
>>>> > > > > > should
>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>> implementation
>>>> > > > for
>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>> there any
>>>> > > > chance
>>>> > > > > > to replace
>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>> CollectionAccumulator[2] or
>>>> > > > > > LongAccumulator[3]
>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>> are able to
>>>> > > > > > work?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> ---
>>>> > > > > > > >>>>>> Cheers,
>>>> > > > > > > >>>>>> -z
>>>> > > > > > > >>>>>> [1]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0
>>>> > > > > > > >>>>>> [2]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0
>>>> > > > > > > >>>>>> [3]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> 
>>>> > > > > > > >>>>>> From: Something Something 
>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>> > > > > > > >>>>>> To: spark-user
>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>>>> > > > Streaming
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>> this
>>>> > > > > > functionality
>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>>>> this but
>>>> > > > > > can't get it
>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>> Accumulator
>>>> > > > > > class that
>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>> track of
>>>> > > > one or
>>>> > > > > > more
>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>>> class:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>> "MyAccumulator")
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>>>> help
>>>> > > > would be
>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> > > > > > > >>>>>> mailinglist...@gmail.com>>> mailinglist...@gmail.com>>
>>>> > > > wrote:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>> > > > Accumulators in
>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>>> when I
>>>> > > > try to
>>>> > > > > > print
>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > >>>>>> updateAcrossEvents
>>>> > > > > > > >>>>>>   )
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>> 'updateAcrossEvents'.
>>>> > > > I've a
>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>> accumulators in
>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>> Accumulators
>>>> > > > are
>>>> > > > > > ALWAYS
>>>> > > > > > > >>>>>> ZERO!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When I added log statements in the
>>>> updateAcrossEvents, I
>>>> > > > could see
>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>> expected.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>>> In Local
>>>> > > > mode
>>>> > > > > > it
>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>> not getting
>>>> > > > > > distributed
>>>> > > > > > > >>>>>> correctly - or something like that!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>>> tell me to
>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>> This is a
>>>> > > > > > 'Stateful
>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>> 'registering' them
>>>> > > > in
>>>> > > > > > > >>>>>> SparkContext.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> >
>>>> > -
>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >
>>>>
>>>


Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
gt; updateAcrossEvents
>>> > > > > > > >   )
>>> > > > > > > >
>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>> We're
>>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>>> In other
>>> > > > > > streaming applications it works as expected.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>> srini@gmail.com>
>>> > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > >> Yes, I am talking about Application specific Accumulators.
>>> > > > Actually I
>>> > > > > > am
>>> > > > > > > >> getting the values printed in my driver log as well as
>>> sent to
>>> > > > > > Grafana. Not
>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>> “client” on
>>> > > > a
>>> > > > > > yarn
>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>> It should
>>> > > > > > work the
>>> > > > > > > >> same for cluster mode as well.
>>> > > > > > > >> Create accumulators like this:
>>> > > > > > > >> AccumulatorV2 accumulator =
>>> sparkContext.longAccumulator(name);
>>> > > > > > > >>
>>> > > > > > > >>
>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> > > > > > > >> mailinglist...@gmail.com> wrote:
>>> > > > > > > >>
>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>> getting
>>> > > > > > computed in
>>> > > > > > > >>> your code? I am talking about the Application Specific
>>> > > > Accumulators.
>>> > > > > > The
>>> > > > > > > >>> other standard counters such as
>>> > > > 'event.progress.inputRowsPerSecond'
>>> > > > > > are
>>> > > > > > > >>> getting populated correctly!
>>> > > > > > > >>>
>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>> srini@gmail.com>
>>> > > > > > wrote:
>>> > > > > > > >>>
>>> > > > > > > >>>> Hello,
>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>> OnQueryProgress. I use
>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
>>> not on
>>> > > > > > cluster.
>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>> Graphana, the
>>> > > > > > values
>>> > > > > > > >>>> are coming there.
>>> > > > > > > >>>>
>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> > > > > > > >>>> mailinglist...@gmail.com> wrote:
>>> > > > > > > >>>>
>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>> > > > > > > >>>>>
>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>> wezh...@outlook.com
>>> > > > >
>>> > > > > > wrote:
>>> > > > > > > >>>>>
>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>>> OUT type
>>> > > > > > should
>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>> implementation
>>> > > > for
>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
>>> any
>>> > > > chance
>>> > > > > > to replace
>>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2]
>>> or
>>> > > > > > LongAccumulator[3]
>>> > > > > > > >>>>>> and test if the StreamingListener and other codes are
>>> able to
>>> > > > > > work?
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> ---
>>> > > > > > > >>>>>> Cheers,
>>> > > > > > > >>>>>> -z
>>> > > > > > > >>>>>> [1]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0
>>> > > > > > > >>>>>> [2]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0
>>> > > > > > > >>>>>> [3]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> 
>>> > > > > > > >>>>>> From: Something Something 
>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>> > > > > > > >>>>>> To: spark-user
>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>>> > > > Streaming
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>> this
>>> > > > > > functionality
>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>>> this but
>>> > > > > > can't get it
>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>> Accumulator
>>> > > > > > class that
>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>> track of
>>> > > > one or
>>> > > > > > more
>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>> class:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>> "MyAccumulator")
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>>> help
>>> > > > would be
>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> > > > > > > >>>>>> mailinglist...@gmail.com>> mailinglist...@gmail.com>>
>>> > > > wrote:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>> > > > Accumulators in
>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>> when I
>>> > > > try to
>>> > > > > > print
>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> > > > > > > >>>>>> updateAcrossEvents
>>> > > > > > > >>>>>>   )
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> The accumulators get incremented in
>>> 'updateAcrossEvents'.
>>> > > > I've a
>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>> accumulators in
>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>> Accumulators
>>> > > > are
>>> > > > > > ALWAYS
>>> > > > > > > >>>>>> ZERO!
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> When I added log statements in the
>>> updateAcrossEvents, I
>>> > > > could see
>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>> expected.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>> In Local
>>> > > > mode
>>> > > > > > it
>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>> not getting
>>> > > > > > distributed
>>> > > > > > > >>>>>> correctly - or something like that!
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>> tell me to
>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
>>> is a
>>> > > > > > 'Stateful
>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>> 'registering' them
>>> > > > in
>>> > > > > > > >>>>>> SparkContext.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>


Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
t; > > > > > AFAICT, the accumulator is incremented well. There is a way to
>> verify
>> > > > that
>> > > > > > in cluster like this:
>> > > > > > ```
>> > > > > > // Add the following while loop before invoking
>> awaitTermination
>> > > > > > while (true) {
>> > > > > >   println("My acc: " + myAcc.value)
>> > > > > >   Thread.sleep(5 * 1000)
>> > > > > > }
>> > > > > >
>> > > > > > //query.awaitTermination()
>> > > > > > ```
>> > > > > >
>> > > > > > And the accumulator value updated can be found from driver
>> stdout.
>> > > > > >
>> > > > > > --
>> > > > > > Cheers,
>> > > > > > -z
>> > > > > >
>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>> > > > > > Srinivas V  wrote:
>> > > > > >
>> > > > > > > yes, I am using stateful structured streaming. Yes similar to
>> what
>> > > > you
>> > > > > > do.
>> > > > > > > This is in Java
>> > > > > > > I do it this way:
>> > > > > > > Dataset productUpdates = watermarkedDS
>> > > > > > > .groupByKey(
>> > > > > > > (MapFunction> String>) event
>> > > > ->
>> > > > > > > event.getId(), Encoders.STRING())
>> > > > > > > .mapGroupsWithState(
>> > > > > > > new
>> > > > > > >
>> > > > > >
>> > > >
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> > > > > > > appConfig, accumulators),
>> > > > > > > Encoders.bean(ModelStateInfo.class),
>> > > > > > > Encoders.bean(ModelUpdate.class),
>> > > > > > >
>>  GroupStateTimeout.ProcessingTimeTimeout());
>> > > > > > >
>> > > > > > > StateUpdateTask contains the update method.
>> > > > > > >
>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>> > > > > > > mailinglist...@gmail.com> wrote:
>> > > > > > >
>> > > > > > > > Yes, that's exactly how I am creating them.
>> > > > > > > >
>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>> in which
>> > > > > > you've
>> > > > > > > > something like this?
>> > > > > > > >
>> > > > > > > >
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> > > > > > > > updateAcrossEvents
>> > > > > > > >   )
>> > > > > > > >
>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>> We're
>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>> In other
>> > > > > > streaming applications it works as expected.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>> srini@gmail.com>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Yes, I am talking about Application specific Accumulators.
>> > > > Actually I
>> > > > > > am
>> > > > > > > >> getting the values printed in my driver log as well as
>> sent to
>> > > > > > Grafana. Not
>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>> “client” on
>> > > > a
>> > > > > > yarn
>> > > > > > > >> cluster(not local Mac) where I submit from master node. It
>> should
>> > > > > > work the
>> > > > > > > >> same for cluster

Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
 new
> > > > > > >
> > > > > >
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > > appConfig, accumulators),
> > > > > > > Encoders.bean(ModelStateInfo.class),
> > > > > > > Encoders.bean(ModelUpdate.class),
> > > > > > >
>  GroupStateTimeout.ProcessingTimeTimeout());
> > > > > > >
> > > > > > > StateUpdateTask contains the update method.
> > > > > > >
> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > > mailinglist...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Yes, that's exactly how I am creating them.
> > > > > > > >
> > > > > > > > Question... Are you using 'Stateful Structured Streaming' in
> which
> > > > > > you've
> > > > > > > > something like this?
> > > > > > > >
> > > > > > > >
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > > > updateAcrossEvents
> > > > > > > >   )
> > > > > > > >
> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
> We're
> > > > > > experiencing this only under 'Stateful Structured Streaming'. In
> other
> > > > > > streaming applications it works as expected.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
> srini@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > > Actually I
> > > > > > am
> > > > > > > >> getting the values printed in my driver log as well as sent
> to
> > > > > > Grafana. Not
> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
> “client” on
> > > > a
> > > > > > yarn
> > > > > > > >> cluster(not local Mac) where I submit from master node. It
> should
> > > > > > work the
> > > > > > > >> same for cluster mode as well.
> > > > > > > >> Create accumulators like this:
> > > > > > > >> AccumulatorV2 accumulator =
> sparkContext.longAccumulator(name);
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > > >> mailinglist...@gmail.com> wrote:
> > > > > > > >>
> > > > > > > >>> Hmm... how would they go to Graphana if they are not
> getting
> > > > > > computed in
> > > > > > > >>> your code? I am talking about the Application Specific
> > > > Accumulators.
> > > > > > The
> > > > > > > >>> other standard counters such as
> > > > 'event.progress.inputRowsPerSecond'
> > > > > > are
> > > > > > > >>> getting populated correctly!
> > > > > > > >>>
> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
> srini@gmail.com>
> > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>>> Hello,
> > > > > > > >>>> Even for me it comes as 0 when I print in
> OnQueryProgress. I use
> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
> not on
> > > > > > cluster.
> > > > > > > >>>> But one consolation is that when I send metrics to
> Graphana, the
> > > > > > values
> > > > > > > >>>> are coming there.
> > > > > > > >>>>
> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > > >>>> mailinglist...@gmail.com> wrote:
> > > > > 

Re: Using Spark Accumulators with Structured Streaming

2020-06-04 Thread ZHANG Wei
Events
> > > > > > >   )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> mailinglist...@gmail.com> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I 
> > > > > > >>>> use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, 
> > > > > > >>>> the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> mailinglist...@gmail.com> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7

Re: Using Spark Accumulators with Structured Streaming

2020-06-01 Thread ZHANG Wei
M Srinivas V 
> > > > wrote:
> > > > > >>>
> > > > > >>>> Hello,
> > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > cluster.
> > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > values
> > > > > >>>> are coming there.
> > > > > >>>>
> > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > >>>> mailinglist...@gmail.com> wrote:
> > > > > >>>>
> > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > >>>>>
> > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  > >
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > should
> > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > for
> > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > chance
> > > > to replace
> > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > LongAccumulator[3]
> > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > work?
> > > > > >>>>>>
> > > > > >>>>>> ---
> > > > > >>>>>> Cheers,
> > > > > >>>>>> -z
> > > > > >>>>>> [1]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860033353sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3Dreserved=0
> > > > > >>>>>> [2]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3Dreserved=0
> > > > > >>>>>> [3]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435%7C1%7C0%7C637263729860038343sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3Dreserved=0
> > > > > >>>>>>
> > > > > >>>>>> 
> > > > > >>>>>> From: Something Something 
> > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > >>>>>> To: spark-user
> > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > Streaming
> > > > > >>>>>>
> > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > functionality
> > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > can't get it
> > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > class that
> > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > one or
> > > > more
> > > > > >>>>>> accumulators. Here's the definition:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > >>>>>>
> > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > >>>>>>
> > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > >>>>>>
> > > > > >>>>>> Is this working under Structured Streaming?
> > > > > >>>>>>
> > > > > >>>>>> I will keep looking for alternate approaches but any help
> > would be
> > > > > >>>>>> greatly appreciated. Thanks.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>>
> > wrote:
> > > > > >>>>>>
> > > > > >>>>>> In my structured streaming job I am updating Spark
> > Accumulators in
> > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > try to
> > > > print
> > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > >>>>>>
> > > > > >>>>>>
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > >>>>>> updateAcrossEvents
> > > > > >>>>>>   )
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > I've a
> > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > are
> > > > ALWAYS
> > > > > >>>>>> ZERO!
> > > > > >>>>>>
> > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > could see
> > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > >>>>>>
> > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > mode
> > > > it
> > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > distributed
> > > > > >>>>>> correctly - or something like that!
> > > > > >>>>>>
> > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > 'Stateful
> > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > in
> > > > > >>>>>> SparkContext.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > >
> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark Accumulators with Structured Streaming

2020-05-30 Thread Srinivas V
On Wed, May 27, 2020 at 9:01 AM Srinivas V 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is 
>>>>>>>> “client”
>>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It 
>>>>>>>> should
>>>>>>>> work the same for cluster mode as well.
>>>>>>>> Create accumulators like this:
>>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>>
>>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on 
>>>>>>>>>> cluster.
>>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>>> values are coming there.
>>>>>>>>>>
>>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the 
>>>>>>>>>>>> implementation for
>>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance 
>>>>>>>>>>>> to replace
>>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>>
>>>>>>>>>>>> ---
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> -z
>>>>>>>>>>>> [1]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>>> [2]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>>> [3]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>>
>>>>>>>>>>>> 
>>>>>>>>>>>> From: Something Something 
>>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>>> To: spark-user
>>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>>
>>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>>>> this but
>>>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>>>> Accumulator
>>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>>>> of one
>>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>>
>>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>>
>>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>>
>>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try 
>>>>>>>>>>>> to print
>>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>>
>>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>>> updateAcrossEvents
>>>>>>>>>>>>   )
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've
>>>>>>>>>>>> a StreamingListener which writes values of the accumulators in
>>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>>>> ALWAYS
>>>>>>>>>>>> ZERO!
>>>>>>>>>>>>
>>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>>
>>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>>> mode it works fine which implies that the Accumulators are not 
>>>>>>>>>>>> getting
>>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>>
>>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 
>>>>>>>>>>>> 'Stateful
>>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>>> SparkContext.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
gt;>>>>>> should
>>>>>>> work the same for cluster mode as well.
>>>>>>> Create accumulators like this:
>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>
>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on 
>>>>>>>>> cluster.
>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>> values are coming there.
>>>>>>>>>
>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>>>>>
>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the 
>>>>>>>>>>> implementation for
>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>>>> replace
>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -z
>>>>>>>>>>> [1]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>> [2]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>> [3]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>
>>>>>>>>>>> 
>>>>>>>>>>> From: Something Something 
>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>> To: spark-user
>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>
>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>>> this but
>>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>>> Accumulator
>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>>> of one
>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>
>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>
>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>
>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>
>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try 
>>>>>>>>>>> to print
>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>
>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>> updateAcrossEvents
>>>>>>>>>>>   )
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>>> ALWAYS
>>>>>>>>>>> ZERO!
>>>>>>>>>>>
>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>
>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>> mode it works fine which implies that the Accumulators are not 
>>>>>>>>>>> getting
>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>
>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>> SparkContext.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
t;>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>> values are coming there.
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>>>>
>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>>>> for
>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>>> replace
>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>> Cheers,
>>>>>>>>>> -z
>>>>>>>>>> [1]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>> [2]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>> [3]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>
>>>>>>>>>> 
>>>>>>>>>> From: Something Something 
>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>> To: spark-user
>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>
>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>> this but
>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>> Accumulator
>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>> of one
>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>
>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>
>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>
>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>
>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>>>>
>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to 
>>>>>>>>>> print
>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>
>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>> updateAcrossEvents
>>>>>>>>>>   )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>> ALWAYS
>>>>>>>>>> ZERO!
>>>>>>>>>>
>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>
>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>
>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>> SparkContext.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>>>
>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>>> for
>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>> replace
>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>> LongAccumulator[3]
>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> Cheers,
>>>>>>>>> -z
>>>>>>>>> [1]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>> [2]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>> [3]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> From: Something Something 
>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>> To: spark-user
>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>
>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>> this but
>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>> Accumulator
>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of 
>>>>>>>>> one
>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>
>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>
>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>
>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>
>>>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>>>> greatly appreciated. Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>>>
>>>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to 
>>>>>>>>> print
>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>
>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>> updateAcrossEvents
>>>>>>>>>   )
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>> ALWAYS
>>>>>>>>> ZERO!
>>>>>>>>>
>>>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>>>
>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>
>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>> SparkContext.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
gt; >
> > > And the accumulator value updated can be found from driver stdout.
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V  wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> > > do.
> > > > This is in Java
> > > > I do it this way:
> > > > Dataset productUpdates = watermarkedDS
> > > > .groupByKey(
> > > > (MapFunction) event
> ->
> > > > event.getId(), Encoders.STRING())
> > > > .mapGroupsWithState(
> > > > new
> > > >
> > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > > Encoders.bean(ModelStateInfo.class),
> > > > Encoders.bean(ModelUpdate.class),
> > > > GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > StateUpdateTask contains the update method.
> > > >
> > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > mailinglist...@gmail.com> wrote:
> > > >
> > > > > Yes, that's exactly how I am creating them.
> > > > >
> > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > you've
> > > > > something like this?
> > > > >
> > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > updateAcrossEvents
> > > > >   )
> > > > >
> > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > streaming applications it works as expected.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> > > wrote:
> > > > >
> > > > >> Yes, I am talking about Application specific Accumulators.
> Actually I
> > > am
> > > > >> getting the values printed in my driver log as well as sent to
> > > Grafana. Not
> > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> a
> > > yarn
> > > > >> cluster(not local Mac) where I submit from master node. It should
> > > work the
> > > > >> same for cluster mode as well.
> > > > >> Create accumulators like this:
> > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > >>
> > > > >>
> > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > >> mailinglist...@gmail.com> wrote:
> > > > >>
> > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > computed in
> > > > >>> your code? I am talking about the Application Specific
> Accumulators.
> > > The
> > > > >>> other standard counters such as
> 'event.progress.inputRowsPerSecond'
> > > are
> > > > >>> getting populated correctly!
> > > > >>>
> > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> > > wrote:
> > > > >>>
> > > > >>>> Hello,
> > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > cluster.
> > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > values
> > > > >>>> are coming there.
> > > > >>>>
> > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > >>>> mailinglist...@gmail.com> wrote:
> > > > >>>>
> > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > >>>>>
> > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  >
> > > wrote:
> > > > >>>>>
> > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the 

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
Yes it is application specific class. This is how java Spark Functions work.
You can refer to this code in the documentation:
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

public class StateUpdateTask implements MapGroupsWithStateFunction {

@Override
public ModelUpdate call(String productId, Iterator
eventsIterator, GroupState state) {
}
}

On Thu, May 28, 2020 at 10:59 PM Something Something <
mailinglist...@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does it
> have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what you
>> do. This is in Java
>> I do it this way:
>> Dataset productUpdates = watermarkedDS
>> .groupByKey(
>> (MapFunction) event ->
>> event.getId(), Encoders.STRING())
>> .mapGroupsWithState(
>> new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>> Encoders.bean(ModelStateInfo.class),
>> Encoders.bean(ModelUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>> streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>>
>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>> the same for cluster mode as well.
>>>> Create accumulators like this:
>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>
>>>>
>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>> in your code? I am talking about the Application Specific Accumulators. 
>>>>> The
>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>> getting populated correctly!
>>>>>
>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>> values are coming there.
>>>>>>
>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>
>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>>
>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>> for
>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>> replace
>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>&

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
roupsWithState(
> > > new
> > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > > Encoders.bean(ModelStateInfo.class),
> > > Encoders.bean(ModelUpdate.class),
> > > GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > StateUpdateTask contains the update method.
> > >
> > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > mailinglist...@gmail.com> wrote:
> > >
> > > > Yes, that's exactly how I am creating them.
> > > >
> > > > Question... Are you using 'Stateful Structured Streaming' in which
> > you've
> > > > something like this?
> > > >
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > updateAcrossEvents
> > > >   )
> > > >
> > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > experiencing this only under 'Stateful Structured Streaming'. In other
> > streaming applications it works as expected.
> > > >
> > > >
> > > >
> > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> > wrote:
> > > >
> > > >> Yes, I am talking about Application specific Accumulators. Actually I
> > am
> > > >> getting the values printed in my driver log as well as sent to
> > Grafana. Not
> > > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> > yarn
> > > >> cluster(not local Mac) where I submit from master node. It should
> > work the
> > > >> same for cluster mode as well.
> > > >> Create accumulators like this:
> > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > >>
> > > >>
> > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > >> mailinglist...@gmail.com> wrote:
> > > >>
> > > >>> Hmm... how would they go to Graphana if they are not getting
> > computed in
> > > >>> your code? I am talking about the Application Specific Accumulators.
> > The
> > > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> > are
> > > >>> getting populated correctly!
> > > >>>
> > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> > wrote:
> > > >>>
> > > >>>> Hello,
> > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > cluster.
> > > >>>> But one consolation is that when I send metrics to Graphana, the
> > values
> > > >>>> are coming there.
> > > >>>>
> > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > >>>> mailinglist...@gmail.com> wrote:
> > > >>>>
> > > >>>>> No this is not working even if I use LongAccumulator.
> > > >>>>>
> > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei 
> > wrote:
> > > >>>>>
> > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > should
> > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> > to replace
> > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > LongAccumulator[3]
> > > >>>>>> and test if the StreamingListener and other codes are able to
> > work?
> > > >>>>>>
> > > >>>>>> ---
> > > >>>>>> Cheers,
> > > >>>>>> -z
> > > >>>>>> [1]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435%7C1%7C0%7C637262729866357123sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3Dreserved=0
> > > >>>>>> [2]
>

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
I am assuming StateUpdateTask is your application specific class. Does it
have 'updateState' method or something? I googled but couldn't find any
documentation about doing it this way. Can you please direct me to some
documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event ->
> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which you've
>> something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
>> this only under 'Stateful Structured Streaming'. In other streaming 
>> applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually I am
>>> getting the values printed in my driver log as well as sent to Grafana. Not
>>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>>> cluster(not local Mac) where I submit from master node. It should work the
>>> same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>> getting populated correctly!
>>>>
>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>>>
>>>>> Hello,
>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>> values are coming there.
>>>>>
>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>
>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>
>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>> replace
>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>> LongAccumulator[3]
>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>
>>>>>>> ---
>>>>>>> Cheers,
>>>>>>> -z
>>>>>>> [1]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>> [2]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>> [3]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>
>>>>>>> 
>>>>>>> From: Something Something 
>>>>>>> Sent: Saturday, May 16, 2020 0:38
>&g

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > >> mailinglist...@gmail.com> wrote:
> > >>
> > >>> Hmm... how would they go to Graphana if they are not getting
> computed in
> > >>> your code? I am talking about the Application Specific Accumulators.
> The
> > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> are
> > >>> getting populated correctly!
> > >>>
> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> wrote:
> > >>>
> > >>>> Hello,
> > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> cluster.
> > >>>> But one consolation is that when I send metrics to Graphana, the
> values
> > >>>> are coming there.
> > >>>>
> > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > >>>> mailinglist...@gmail.com> wrote:
> > >>>>
> > >>>>> No this is not working even if I use LongAccumulator.
> > >>>>>
> > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei 
> wrote:
> > >>>>>
> > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> should
> > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> to replace
> > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> LongAccumulator[3]
> > >>>>>> and test if the StreamingListener and other codes are able to
> work?
> > >>>>>>
> > >>>>>> ---
> > >>>>>> Cheers,
> > >>>>>> -z
> > >>>>>> [1]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3Dreserved=0
> > >>>>>> [2]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3Dreserved=0
> > >>>>>> [3]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3Dreserved=0
> > >>>>>>
> > >>>>>> 
> > >>>>>> From: Something Something 
> > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > >>>>>> To: spark-user
> > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > >>>>>>
> > >>>>>> Can someone from Spark Development team tell me if this
> functionality
> > >>>>>> is supported and tested? I've spent a lot of time on this but
> can't get it
> > >>>>>> to work. Just to add more context, we've our own Accumulator
> class that
> > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> more
> > >>>>>> accumulators. Here's the definition:
> > >>>>>>
> > >>>>>>
> > >>>>>> class CollectionLongAccumulator[T]
> > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
> > >>>>>>
> > >>>>>> When the job begins we register an instance of this class:
> > >>>>>>
> > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > >>>>>>
> > >>>>>> Is this working under Structured Streaming?
> > >>>>>>
> > >>>>>> I will keep looking for alternate approaches but any help would be
> > >>>>>> greatly appreciated. Thanks.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
> > >>>>>>
> > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> print
> > >>>>>> them in my StreamingListener. Here's the code:
> > >>>>>>
> > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > >>>>>> updateAcrossEvents
> > >>>>>>   )
> > >>>>>>
> > >>>>>>
> > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > >>>>>> StreamingListener which writes values of the accumulators in
> > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> ALWAYS
> > >>>>>> ZERO!
> > >>>>>>
> > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > >>>>>> that these accumulators are getting incremented as expected.
> > >>>>>>
> > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> it
> > >>>>>> works fine which implies that the Accumulators are not getting
> distributed
> > >>>>>> correctly - or something like that!
> > >>>>>>
> > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > >>>>>> perform an "Action". That's not a solution here. This is a
> 'Stateful
> > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > >>>>>> SparkContext.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
gt; >>>>>> [2]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3Dreserved=0
> >>>>>> [3]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3Dreserved=0
> >>>>>>
> >>>>>> 
> >>>>>> From: Something Something 
> >>>>>> Sent: Saturday, May 16, 2020 0:38
> >>>>>> To: spark-user
> >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> >>>>>>
> >>>>>> Can someone from Spark Development team tell me if this functionality
> >>>>>> is supported and tested? I've spent a lot of time on this but can't 
> >>>>>> get it
> >>>>>> to work. Just to add more context, we've our own Accumulator class that
> >>>>>> extends from AccumulatorV2. In this class we keep track of one or more
> >>>>>> accumulators. Here's the definition:
> >>>>>>
> >>>>>>
> >>>>>> class CollectionLongAccumulator[T]
> >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
> >>>>>>
> >>>>>> When the job begins we register an instance of this class:
> >>>>>>
> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> >>>>>>
> >>>>>> Is this working under Structured Streaming?
> >>>>>>
> >>>>>> I will keep looking for alternate approaches but any help would be
> >>>>>> greatly appreciated. Thanks.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
> >>>>>>
> >>>>>> In my structured streaming job I am updating Spark Accumulators in
> >>>>>> the updateAcrossEvents method but they are always 0 when I try to print
> >>>>>> them in my StreamingListener. Here's the code:
> >>>>>>
> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >>>>>> updateAcrossEvents
> >>>>>>   )
> >>>>>>
> >>>>>>
> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> >>>>>> StreamingListener which writes values of the accumulators in
> >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> >>>>>> ZERO!
> >>>>>>
> >>>>>> When I added log statements in the updateAcrossEvents, I could see
> >>>>>> that these accumulators are getting incremented as expected.
> >>>>>>
> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
> >>>>>> works fine which implies that the Accumulators are not getting 
> >>>>>> distributed
> >>>>>> correctly - or something like that!
> >>>>>>
> >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
> >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> >>>>>> SparkContext.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
yes, I am using stateful structured streaming. Yes similar to what you do.
This is in Java
I do it this way:
Dataset productUpdates = watermarkedDS
.groupByKey(
(MapFunction) event ->
event.getId(), Encoders.STRING())
.mapGroupsWithState(
new
StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
appConfig, accumulators),
Encoders.bean(ModelStateInfo.class),
Encoders.bean(ModelUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <
mailinglist...@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which you've
> something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
> this only under 'Stateful Structured Streaming'. In other streaming 
> applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I am
>> getting the values printed in my driver log as well as sent to Grafana. Not
>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>> cluster(not local Mac) where I submit from master node. It should work the
>> same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting computed in
>>> your code? I am talking about the Application Specific Accumulators. The
>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>> getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>>
>>>> Hello,
>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>> But one consolation is that when I send metrics to Graphana, the values
>>>> are coming there.
>>>>
>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> No this is not working even if I use LongAccumulator.
>>>>>
>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>
>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>> replace
>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>> LongAccumulator[3]
>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>
>>>>>> ---
>>>>>> Cheers,
>>>>>> -z
>>>>>> [1]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>> [2]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>> [3]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>
>>>>>> 
>>>>>> From: Something Something 
>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> To: spark-user
>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>
>>>>>> Can someone from Spark Development team tell me if this functionality
>>>>>> is supported and tested? I've spent a lot of time on this but can't get 
>>>>>> it
>>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>>> accumulators. Here's the definition:
>

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
Yes, that's exactly how I am creating them.

Question... Are you using 'Stateful Structured Streaming' in which you've
something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )

And updating the Accumulator inside 'updateAcrossEvents'? We're
experiencing this only under 'Stateful Structured Streaming'. In other
streaming applications it works as expected.



On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:

> Yes, I am talking about Application specific Accumulators. Actually I am
> getting the values printed in my driver log as well as sent to Grafana. Not
> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> cluster(not local Mac) where I submit from master node. It should work the
> same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application Specific Accumulators. The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the values
>>> are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> No this is not working even if I use LongAccumulator.
>>>>
>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>
>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>> replace
>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>> LongAccumulator[3]
>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>
>>>>> ---
>>>>> Cheers,
>>>>> -z
>>>>> [1]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>> [2]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>> [3]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>
>>>>> 
>>>>> From: Something Something 
>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>> To: spark-user
>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>
>>>>> Can someone from Spark Development team tell me if this functionality
>>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>> accumulators. Here's the definition:
>>>>>
>>>>>
>>>>> class CollectionLongAccumulator[T]
>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>
>>>>> When the job begins we register an instance of this class:
>>>>>
>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>
>>>>> Is this working under Structured Streaming?
>>>>>
>>>>> I will keep looking for alternate approaches but any help would be
>>>>> greatly appreciated. Thanks.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>
>>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>>> updateAcrossEvents method but they are always 0 when I try to print them 
>>>>> in
>>>>> my StreamingListener. Here's the code:
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> updateAcrossEvents
>>>>>   )
>>>>>
>>>>>
>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>> StreamingListener which writes values of the accumulators in
>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>> ZERO!
>>>>>
>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>> that these accumulators are getting incremented as expected.
>>>>>
>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>> correctly - or something like that!
>>>>>
>>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>>
>>>>>
>>>>>
>>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Srinivas V
Yes, I am talking about Application specific Accumulators. Actually I am
getting the values printed in my driver log as well as sent to Grafana. Not
sure where and when I saw 0 before. My deploy mode is “client” on a yarn
cluster(not local Mac) where I submit from master node. It should work the
same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <
mailinglist...@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting computed in
> your code? I am talking about the Application Specific Accumulators. The
> other standard counters such as 'event.progress.inputRowsPerSecond' are
> getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>> But one consolation is that when I send metrics to Graphana, the values
>> are coming there.
>>
>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> No this is not working even if I use LongAccumulator.
>>>
>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>
>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>>> atomic or thread safe. I'm wondering if the implementation for
>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>> and test if the StreamingListener and other codes are able to work?
>>>>
>>>> ---
>>>> Cheers,
>>>> -z
>>>> [1]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>> [2]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>> [3]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>
>>>> 
>>>> From: Something Something 
>>>> Sent: Saturday, May 16, 2020 0:38
>>>> To: spark-user
>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>
>>>> Can someone from Spark Development team tell me if this functionality
>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>> to work. Just to add more context, we've our own Accumulator class that
>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>> accumulators. Here's the definition:
>>>>
>>>>
>>>> class CollectionLongAccumulator[T]
>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>
>>>> When the job begins we register an instance of this class:
>>>>
>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>
>>>> Is this working under Structured Streaming?
>>>>
>>>> I will keep looking for alternate approaches but any help would be
>>>> greatly appreciated. Thanks.
>>>>
>>>>
>>>>
>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>
>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>>> my StreamingListener. Here's the code:
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> updateAcrossEvents
>>>>   )
>>>>
>>>>
>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>> StreamingListener which writes values of the accumulators in
>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>> ZERO!
>>>>
>>>> When I added log statements in the updateAcrossEvents, I could see that
>>>> these accumulators are getting incremented as expected.
>>>>
>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>> works fine which implies that the Accumulators are not getting distributed
>>>> correctly - or something like that!
>>>>
>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>
>>>>
>>>>
>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
Hmm... how would they go to Graphana if they are not getting computed in
your code? I am talking about the Application Specific Accumulators. The
other standard counters such as 'event.progress.inputRowsPerSecond' are
getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> No this is not working even if I use LongAccumulator.
>>
>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>
>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>> atomic or thread safe. I'm wondering if the implementation for
>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>> and test if the StreamingListener and other codes are able to work?
>>>
>>> ---
>>> Cheers,
>>> -z
>>> [1]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>> [2]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>> 
>>> From: Something Something 
>>> Sent: Saturday, May 16, 2020 0:38
>>> To: spark-user
>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>
>>> Can someone from Spark Development team tell me if this functionality is
>>> supported and tested? I've spent a lot of time on this but can't get it to
>>> work. Just to add more context, we've our own Accumulator class that
>>> extends from AccumulatorV2. In this class we keep track of one or more
>>> accumulators. Here's the definition:
>>>
>>>
>>> class CollectionLongAccumulator[T]
>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>
>>> When the job begins we register an instance of this class:
>>>
>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>
>>> Is this working under Structured Streaming?
>>>
>>> I will keep looking for alternate approaches but any help would be
>>> greatly appreciated. Thanks.
>>>
>>>
>>>
>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>
>>> In my structured streaming job I am updating Spark Accumulators in the
>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>> my StreamingListener. Here's the code:
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>>
>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>> StreamingListener which writes values of the accumulators in
>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>> ZERO!
>>>
>>> When I added log statements in the updateAcrossEvents, I could see that
>>> these accumulators are getting incremented as expected.
>>>
>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>> works fine which implies that the Accumulators are not getting distributed
>>> correctly - or something like that!
>>>
>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>
>>>
>>>
>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Srinivas V
Hello,
Even for me it comes as 0 when I print in OnQueryProgress. I use
LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are
coming there.

On Tue, May 26, 2020 at 3:10 AM Something Something <
mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>> atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> ________
>> From: Something Something 
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality is
>> supported and tested? I've spent a lot of time on this but can't get it to
>> work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in the
>> updateAcrossEvents method but they are always 0 when I try to print them in
>> my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>>
>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>> StreamingListener which writes values of the accumulators in
>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>> ZERO!
>>
>> When I added log statements in the updateAcrossEvents, I could see that
>> these accumulators are getting incremented as expected.
>>
>> This only happens when I run in the 'Cluster' mode. In Local mode it
>> works fine which implies that the Accumulators are not getting distributed
>> correctly - or something like that!
>>
>> Note: I've seen quite a few answers on the Web that tell me to perform an
>> "Action". That's not a solution here. This is a 'Stateful Structured
>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>
>>
>>
>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
No this is not working even if I use LongAccumulator.

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should be
> atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> 
> From: Something Something 
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a lot of time on this but can't get it to
> work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
> extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be greatly
> appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread ZHANG Wei
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic 
or thread safe. I'm wondering if the implementation for `java.util.Map[T, 
Long]` can meet it or not. Is there any chance to replace 
CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and 
test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator


From: Something Something 
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is 
supported and tested? I've spent a lot of time on this but can't get it to 
work. Just to add more context, we've our own Accumulator class that extends 
from AccumulatorV2. In this class we keep track of one or more accumulators. 
Here's the definition:


class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly 
appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something 
mailto:mailinglist...@gmail.com>> wrote:

In my structured streaming job I am updating Spark Accumulators in the 
updateAcrossEvents method but they are always 0 when I try to print them in my 
StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )


The accumulators get incremented in 'updateAcrossEvents'. I've a 
StreamingListener which writes values of the accumulators in 'onQueryProgress' 
method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these 
accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine 
which implies that the Accumulators are not getting distributed correctly - or 
something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an 
"Action". That's not a solution here. This is a 'Stateful Structured Streaming' 
job. Yes, I am also 'registering' them in SparkContext.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
Can someone from Spark Development team tell me if this functionality is
supported and tested? I've spent a lot of time on this but can't get it
to work. Just to add more context, we've our own Accumulator class that
extends from AccumulatorV2. In this class we keep track of one or more
accumulators. Here's the definition:

class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.




On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglist...@gmail.com> wrote:

> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Using Spark Accumulators with Structured Streaming

2020-05-14 Thread Something Something
In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )

The accumulators get incremented in 'updateAcrossEvents'. I've a
StreamingListener which writes values of the accumulators in
'onQueryProgress' method but in this method the Accumulators are ALWAYS
ZERO!

When I added log statements in the updateAcrossEvents, I could see that
these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works
fine which implies that the Accumulators are not getting distributed
correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an
"Action". That's not a solution here. This is a 'Stateful Structured
Streaming' job. Yes, I am also 'registering' them in SparkContext.