Ya, I had asked this question before. No one responded. By the way, what’s your actual name “Something something” if you don’t mind me asking?
On Tue, Jun 9, 2020 at 12:27 AM Something Something < mailinglist...@gmail.com> wrote: > What is scary is this interface is marked as "experimental" > > @Experimental > @InterfaceStability.Evolving > public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { > R call(K key, Iterator<V> values, GroupState<S> state) throws Exception; > } > > > > > On Mon, Jun 8, 2020 at 11:54 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Right, this is exactly how I've it right now. Problem is in the cluster >> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you >> will see what I mean. >> >> I think how Zhang is using will work. Will try & revert. >> >> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini....@gmail.com> wrote: >> >>> >>> You don’t need to have a separate class. I created that as it has lot of >>> code and logic in my case. >>> For you to quickly test you can use Zhang’s Scala code in this chain. >>> Pasting it below for your quick reference: >>> >>> ```scala >>> spark.streams.addListener(new StreamingQueryListener { >>> override def onQueryProgress(event: >>> StreamingQueryListener.QueryProgressEvent): >>> Unit = { >>> println(event.progress.id + " is on progress") >>> println(s"My accu is ${myAcc.value} on query progress") >>> } >>> ... >>> }) >>> >>> def mappingFunc(key: Long, values: Iterator[String], state: >>> GroupState[Long]): ... = { >>> myAcc.add(1) >>> println(s">>> key: $key => state: ${state}") >>> ... >>> } >>> >>> val wordCounts = words >>> .groupByKey(v => ...) >>> .mapGroupsWithState(timeoutConf = >>> GroupStateTimeout.ProcessingTimeTimeout)(func >>> = mappingFunc) >>> >>> val query = wordCounts.writeStream >>> .outputMode(OutputMode.Update) >>> >>> >>> On Mon, Jun 8, 2020 at 11:14 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Great. I guess the trick is to use a separate class such as >>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into >>>> Scala. Will try it out & revert. Thanks for the tips. >>>> >>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote: >>>> >>>>> The following Java codes can work in my cluster environment: >>>>> ``` >>>>> .mapGroupsWithState((MapGroupsWithStateFunction<String, String, >>>>> Long, LeadingCharCount>) (key, values, state) -> { >>>>> myAcc.add(1); >>>>> <...> >>>>> state.update(newState); >>>>> return new LeadingCharCount(key, newState); >>>>> }, >>>>> Encoders.LONG(), >>>>> Encoders.bean(LeadingCharCount.class), >>>>> GroupStateTimeout.ProcessingTimeTimeout()) >>>>> ``` >>>>> >>>>> Also works fine with my `StateUpdateTask`: >>>>> ``` >>>>> .mapGroupsWithState( >>>>> new StateUpdateTask(myAcc), >>>>> Encoders.LONG(), >>>>> Encoders.bean(LeadingCharCount.class), >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> >>>>> public class StateUpdateTask >>>>> implements MapGroupsWithStateFunction<String, String, >>>>> Long, LeadingCharCount> { >>>>> private LongAccumulator myAccInTask; >>>>> >>>>> public StateUpdateTask(LongAccumulator acc) { >>>>> this.myAccInTask = acc; >>>>> } >>>>> >>>>> @Override >>>>> public LeadingCharCount call(String key, Iterator<String> >>>>> values, GroupState<Long> state) throws Exception { >>>>> myAccInTask.add(1); >>>>> <...> >>>>> state.update(newState); >>>>> return new LeadingCharCount(key, newState); >>>>> } >>>>> } >>>>> ``` >>>>> >>>>> -- >>>>> Cheers, >>>>> -z >>>>> >>>>> On Tue, 2 Jun 2020 10:28:36 +0800 >>>>> ZHANG Wei <wezh...@outlook.com> wrote: >>>>> >>>>> > Yes, verified on the cluster with 5 executors. >>>>> > >>>>> > -- >>>>> > Cheers, >>>>> > -z >>>>> > >>>>> > On Fri, 29 May 2020 11:16:12 -0700 >>>>> > Something Something <mailinglist...@gmail.com> wrote: >>>>> > >>>>> > > Did you try this on the Cluster? Note: This works just fine under >>>>> 'Local' >>>>> > > mode. >>>>> > > >>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> >>>>> wrote: >>>>> > > >>>>> > > > I can't reproduce the issue with my simple code: >>>>> > > > ```scala >>>>> > > > spark.streams.addListener(new StreamingQueryListener { >>>>> > > > override def onQueryProgress(event: >>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = { >>>>> > > > println(event.progress.id + " is on progress") >>>>> > > > println(s"My accu is ${myAcc.value} on query progress") >>>>> > > > } >>>>> > > > ... >>>>> > > > }) >>>>> > > > >>>>> > > > def mappingFunc(key: Long, values: Iterator[String], state: >>>>> > > > GroupState[Long]): ... = { >>>>> > > > myAcc.add(1) >>>>> > > > println(s">>> key: $key => state: ${state}") >>>>> > > > ... >>>>> > > > } >>>>> > > > >>>>> > > > val wordCounts = words >>>>> > > > .groupByKey(v => ...) >>>>> > > > .mapGroupsWithState(timeoutConf = >>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) >>>>> > > > >>>>> > > > val query = wordCounts.writeStream >>>>> > > > .outputMode(OutputMode.Update) >>>>> > > > ... >>>>> > > > ``` >>>>> > > > >>>>> > > > I'm wondering if there were any errors can be found from driver >>>>> logs? The >>>>> > > > micro-batch >>>>> > > > exceptions won't terminate the streaming job running. >>>>> > > > >>>>> > > > For the following code, we have to make sure that >>>>> `StateUpdateTask` is >>>>> > > > started: >>>>> > > > > .mapGroupsWithState( >>>>> > > > > new >>>>> > > > >>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>> > > > > appConfig, accumulators), >>>>> > > > > Encoders.bean(ModelStateInfo.class), >>>>> > > > > Encoders.bean(ModelUpdate.class), >>>>> > > > > >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> > > > >>>>> > > > -- >>>>> > > > Cheers, >>>>> > > > -z >>>>> > > > >>>>> > > > On Thu, 28 May 2020 19:59:31 +0530 >>>>> > > > Srinivas V <srini....@gmail.com> wrote: >>>>> > > > >>>>> > > > > Giving the code below: >>>>> > > > > //accumulators is a class level variable in driver. >>>>> > > > > >>>>> > > > > sparkSession.streams().addListener(new >>>>> StreamingQueryListener() { >>>>> > > > > @Override >>>>> > > > > public void onQueryStarted(QueryStartedEvent >>>>> queryStarted) { >>>>> > > > > logger.info("Query started: " + >>>>> queryStarted.id()); >>>>> > > > > } >>>>> > > > > @Override >>>>> > > > > public void onQueryTerminated(QueryTerminatedEvent >>>>> > > > > queryTerminated) { >>>>> > > > > logger.info("Query terminated: " + >>>>> > > > queryTerminated.id()); >>>>> > > > > } >>>>> > > > > @Override >>>>> > > > > public void onQueryProgress(QueryProgressEvent >>>>> > > > queryProgress) { >>>>> > > > > >>>>> > > > > >>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows()); >>>>> > > > > long eventsReceived = 0; >>>>> > > > > long eventsExpired = 0; >>>>> > > > > long eventSentSuccess = 0; >>>>> > > > > try { >>>>> > > > > eventsReceived = >>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); >>>>> > > > > eventsExpired = >>>>> > > > > >>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); >>>>> > > > > eventSentSuccess = >>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); >>>>> > > > > } catch (MissingKeyException e) { >>>>> > > > > logger.error("Accumulator key not found >>>>> due to >>>>> > > > > Exception {}", e.getMessage()); >>>>> > > > > } >>>>> > > > > logger.info("Events Received:{}", >>>>> eventsReceived); >>>>> > > > > logger.info("Events State Expired:{}", >>>>> eventsExpired); >>>>> > > > > logger.info("Events Sent Success:{}", >>>>> eventSentSuccess); >>>>> > > > > logger.info("Query made progress - batchId: {} >>>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} >>>>> > > > > durationMs:{}" , >>>>> > > > > queryProgress.progress().batchId(), >>>>> > > > > queryProgress.progress().numInputRows(), >>>>> > > > > queryProgress.progress().inputRowsPerSecond(), >>>>> > > > > >>>>> > > > queryProgress.progress().processedRowsPerSecond(), >>>>> > > > > queryProgress.progress().durationMs()); >>>>> > > > > >>>>> > > > > >>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com> >>>>> wrote: >>>>> > > > > >>>>> > > > > > May I get how the accumulator is accessed in the method >>>>> > > > > > `onQueryProgress()`? >>>>> > > > > > >>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way >>>>> to verify >>>>> > > > that >>>>> > > > > > in cluster like this: >>>>> > > > > > ``` >>>>> > > > > > // Add the following while loop before invoking >>>>> awaitTermination >>>>> > > > > > while (true) { >>>>> > > > > > println("My acc: " + myAcc.value) >>>>> > > > > > Thread.sleep(5 * 1000) >>>>> > > > > > } >>>>> > > > > > >>>>> > > > > > //query.awaitTermination() >>>>> > > > > > ``` >>>>> > > > > > >>>>> > > > > > And the accumulator value updated can be found from driver >>>>> stdout. >>>>> > > > > > >>>>> > > > > > -- >>>>> > > > > > Cheers, >>>>> > > > > > -z >>>>> > > > > > >>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>>>> > > > > > Srinivas V <srini....@gmail.com> wrote: >>>>> > > > > > >>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar >>>>> to what >>>>> > > > you >>>>> > > > > > do. >>>>> > > > > > > This is in Java >>>>> > > > > > > I do it this way: >>>>> > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS >>>>> > > > > > > .groupByKey( >>>>> > > > > > > (MapFunction<InputEventModel, >>>>> String>) event >>>>> > > > -> >>>>> > > > > > > event.getId(), Encoders.STRING()) >>>>> > > > > > > .mapGroupsWithState( >>>>> > > > > > > new >>>>> > > > > > > >>>>> > > > > > >>>>> > > > >>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>> > > > > > > appConfig, accumulators), >>>>> > > > > > > >>>>> Encoders.bean(ModelStateInfo.class), >>>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>>> > > > > > > >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> > > > > > > >>>>> > > > > > > StateUpdateTask contains the update method. >>>>> > > > > > > >>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>> > > > > > > mailinglist...@gmail.com> wrote: >>>>> > > > > > > >>>>> > > > > > > > Yes, that's exactly how I am creating them. >>>>> > > > > > > > >>>>> > > > > > > > Question... Are you using 'Stateful Structured >>>>> Streaming' in which >>>>> > > > > > you've >>>>> > > > > > > > something like this? >>>>> > > > > > > > >>>>> > > > > > > > >>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> > > > > > > > updateAcrossEvents >>>>> > > > > > > > ) >>>>> > > > > > > > >>>>> > > > > > > > And updating the Accumulator inside >>>>> 'updateAcrossEvents'? We're >>>>> > > > > > experiencing this only under 'Stateful Structured >>>>> Streaming'. In other >>>>> > > > > > streaming applications it works as expected. >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>>>> srini....@gmail.com> >>>>> > > > > > wrote: >>>>> > > > > > > > >>>>> > > > > > > >> Yes, I am talking about Application specific >>>>> Accumulators. >>>>> > > > Actually I >>>>> > > > > > am >>>>> > > > > > > >> getting the values printed in my driver log as well as >>>>> sent to >>>>> > > > > > Grafana. Not >>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>>>> “client” on >>>>> > > > a >>>>> > > > > > yarn >>>>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>>>> It should >>>>> > > > > > work the >>>>> > > > > > > >> same for cluster mode as well. >>>>> > > > > > > >> Create accumulators like this: >>>>> > > > > > > >> AccumulatorV2 accumulator = >>>>> sparkContext.longAccumulator(name); >>>>> > > > > > > >> >>>>> > > > > > > >> >>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>> > > > > > > >> mailinglist...@gmail.com> wrote: >>>>> > > > > > > >> >>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>>>> getting >>>>> > > > > > computed in >>>>> > > > > > > >>> your code? I am talking about the Application Specific >>>>> > > > Accumulators. >>>>> > > > > > The >>>>> > > > > > > >>> other standard counters such as >>>>> > > > 'event.progress.inputRowsPerSecond' >>>>> > > > > > are >>>>> > > > > > > >>> getting populated correctly! >>>>> > > > > > > >>> >>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>>>> srini....@gmail.com> >>>>> > > > > > wrote: >>>>> > > > > > > >>> >>>>> > > > > > > >>>> Hello, >>>>> > > > > > > >>>> Even for me it comes as 0 when I print in >>>>> OnQueryProgress. I use >>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local >>>>> but not on >>>>> > > > > > cluster. >>>>> > > > > > > >>>> But one consolation is that when I send metrics to >>>>> Graphana, the >>>>> > > > > > values >>>>> > > > > > > >>>> are coming there. >>>>> > > > > > > >>>> >>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>>>> > > > > > > >>>> >>>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. >>>>> > > > > > > >>>>> >>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>>>> wezh...@outlook.com >>>>> > > > > >>>>> > > > > > wrote: >>>>> > > > > > > >>>>> >>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], >>>>> the OUT type >>>>> > > > > > should >>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>>>> implementation >>>>> > > > for >>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is >>>>> there any >>>>> > > > chance >>>>> > > > > > to replace >>>>> > > > > > > >>>>>> CollectionLongAccumulator by >>>>> CollectionAccumulator[2] or >>>>> > > > > > LongAccumulator[3] >>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes >>>>> are able to >>>>> > > > > > work? >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> --- >>>>> > > > > > > >>>>>> Cheers, >>>>> > > > > > > >>>>>> -z >>>>> > > > > > > >>>>>> [1] >>>>> > > > > > > >>>>>> >>>>> > > > > > >>>>> > > > >>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0 >>>>> > > > > > > >>>>>> [2] >>>>> > > > > > > >>>>>> >>>>> > > > > > >>>>> > > > >>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0 >>>>> > > > > > > >>>>>> [3] >>>>> > > > > > > >>>>>> >>>>> > > > > > >>>>> > > > >>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0 >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> ________________________________________ >>>>> > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com >>>>> > >>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>> > > > > > > >>>>>> To: spark-user >>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with >>>>> Structured >>>>> > > > Streaming >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>>>> this >>>>> > > > > > functionality >>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time >>>>> on this but >>>>> > > > > > can't get it >>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>>>> Accumulator >>>>> > > > > > class that >>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>>>> track of >>>>> > > > one or >>>>> > > > > > more >>>>> > > > > > > >>>>>> accumulators. Here's the definition: >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> When the job begins we register an instance of this >>>>> class: >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>>>> "MyAccumulator") >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> Is this working under Structured Streaming? >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but >>>>> any help >>>>> > > > would be >>>>> > > > > > > >>>>>> greatly appreciated. Thanks. >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something >>>>> < >>>>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto: >>>>> mailinglist...@gmail.com>> >>>>> > > > wrote: >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>>>> > > > Accumulators in >>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 >>>>> when I >>>>> > > > try to >>>>> > > > > > print >>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> > > > > > > >>>>>> updateAcrossEvents >>>>> > > > > > > >>>>>> ) >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> The accumulators get incremented in >>>>> 'updateAcrossEvents'. >>>>> > > > I've a >>>>> > > > > > > >>>>>> StreamingListener which writes values of the >>>>> accumulators in >>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>>>> Accumulators >>>>> > > > are >>>>> > > > > > ALWAYS >>>>> > > > > > > >>>>>> ZERO! >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> When I added log statements in the >>>>> updateAcrossEvents, I >>>>> > > > could see >>>>> > > > > > > >>>>>> that these accumulators are getting incremented as >>>>> expected. >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. >>>>> In Local >>>>> > > > mode >>>>> > > > > > it >>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>>>> not getting >>>>> > > > > > distributed >>>>> > > > > > > >>>>>> correctly - or something like that! >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that >>>>> tell me to >>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. >>>>> This is a >>>>> > > > > > 'Stateful >>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>>>> 'registering' them >>>>> > > > in >>>>> > > > > > > >>>>>> SparkContext. >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > > >>>>>> >>>>> > > > > > >>>>> > > > >>>>> > >>>>> > --------------------------------------------------------------------- >>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> > >>>>> >>>>