Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
(MapFunction>>>>> String>) event >>>>>> > > > -> >>>>>> > > > > > > event.getId(), Encoders.STRING()) >>>>>> > > > > > > .mapGroupsWithState( >>

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
;>>> > > > > > > appConfig, accumulators), >>>>> > > > > > > >>>>> Encoders.bean(ModelStateInfo.class), >>>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>>>

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
>>> > > > > > > > Yes, that's exactly how I am creating them. >>>> > > > > > > > >>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming' >>&g

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
gTimeTimeout())( >>> > > > > > > > updateAcrossEvents >>> > > > > > > > ) >>> > > > > > > > >>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >>> We&#

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
pecific Accumulators. >> > > > Actually I >> > > > > > am >> > > > > > > >> getting the values printed in my driver log as well as >> sent to >> > > > > > Grafana. Not >> > > > >

Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
> >> same for cluster mode as well. > > > > > > > >> Create accumulators like this: > > > > > > > >> AccumulatorV2 accumulator = > sparkContext.longAccumulator(name); > > > > > > > >> > > > > > > > >

Re: Using Spark Accumulators with Structured Streaming

2020-06-03 Thread ZHANG Wei
gt; > > > > >>> your code? I am talking about the Application Specific > > > Accumulators. > > > > > The > > > > > > >>> other standard counters such as > > > 'event.progress.inputRowsPerSecond' > > > > > are > > > > > > >>> getting populated correctly! >

Re: Using Spark Accumulators with Structured Streaming

2020-06-01 Thread ZHANG Wei
> > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > > > cluster. > > > > > >>>> But one consolation is

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
gt;>>>> computed in your code? I am talking about the Application Specific >>>>>>>>> Accumulators. The other standard counters such as >>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly! >>

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
;>>>>> >>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> Even for me it comes as 0 when I print in OnQuery

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
cumulator as well. Yes, it prints on my local but not on cluster. >>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>> values are coming there. >>>>>>>> >>>>>>>> On Tue, May 26, 2020 at 3:10 AM Somet

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
ay 26, 2020 at 3:10 AM Something Something < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>> >>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
t; >>>> mailinglist...@gmail.com> wrote: > > > > >>>> > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > >>>>> > > > > >>>>> On Fri, May 15, 2020 at 9:

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
; On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>>> >>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>&

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
afe. I'm wondering if the implementation for > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance > > to replace > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > > LongAccumulator[3] > &

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
`java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>> replace >>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>> LongAccumulator[3] >>>>>>> and test if the StreamingLi

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
gt; > >>>>>> [1] > > >>>>>> > https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378&sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&reserved=0 > >>>>&

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
to work? >>>>>> >>>>>> --- >>>>>> Cheers, >>>>>> -z >>>>>> [1] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>> [2

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
>>>> [1] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>> [2] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>&g

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Srinivas V
index.html#org.apache.spark.util.AccumulatorV2 >>>> [2] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>> [3] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>> >>>> _

Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
e.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>> [3] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>> >>> >>> From: Something Somethi

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Srinivas V
;> [3] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >> >> ________ >> From: Something Something >> Sent: Saturday, May 16, 2020 0:38 >> To: spark-user >> Subject: Re: Using Spark Accumulators with Structured Streami

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
__ > From: Something Something > Sent: Saturday, May 16, 2020 0:38 > To: spark-user > Subject: Re: Using Spark Accumulators with Structured Streaming > > Can someone from Spark Development team tell me if this functionality is > supported and tested? I've spent a l

Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread ZHANG Wei
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator From: Something Something Sent: Saturday, May 16, 2020 0:38 To: spark-user Subject: Re: Using Spark Accumulators with Structured Streaming Can someone from Spark Develo

Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulator

Using Spark Accumulators with Structured Streaming

2020-05-14 Thread Something Something
In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumula