What is scary is this interface is marked as "experimental"
@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}
On Mon, Jun 8, 2020 at 11:54 AM Something Something <
[email protected]> wrote:
> Right, this is exactly how I've it right now. Problem is in the cluster
> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
> will see what I mean.
>
> I think how Zhang is using will work. Will try & revert.
>
> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <[email protected]> wrote:
>
>>
>> You don’t need to have a separate class. I created that as it has lot of
>> code and logic in my case.
>> For you to quickly test you can use Zhang’s Scala code in this chain.
>> Pasting it below for your quick reference:
>>
>> ```scala
>> spark.streams.addListener(new StreamingQueryListener {
>> override def onQueryProgress(event:
>> StreamingQueryListener.QueryProgressEvent):
>> Unit = {
>> println(event.progress.id + " is on progress")
>> println(s"My accu is ${myAcc.value} on query progress")
>> }
>> ...
>> })
>>
>> def mappingFunc(key: Long, values: Iterator[String], state:
>> GroupState[Long]): ... = {
>> myAcc.add(1)
>> println(s">>> key: $key => state: ${state}")
>> ...
>> }
>>
>> val wordCounts = words
>> .groupByKey(v => ...)
>> .mapGroupsWithState(timeoutConf =
>> GroupStateTimeout.ProcessingTimeTimeout)(func
>> = mappingFunc)
>>
>> val query = wordCounts.writeStream
>> .outputMode(OutputMode.Update)
>>
>>
>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>> [email protected]> wrote:
>>
>>> Great. I guess the trick is to use a separate class such as
>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>> Scala. Will try it out & revert. Thanks for the tips.
>>>
>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[email protected]> wrote:
>>>
>>>> The following Java codes can work in my cluster environment:
>>>> ```
>>>> .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>> myAcc.add(1);
>>>> <...>
>>>> state.update(newState);
>>>> return new LeadingCharCount(key, newState);
>>>> },
>>>> Encoders.LONG(),
>>>> Encoders.bean(LeadingCharCount.class),
>>>> GroupStateTimeout.ProcessingTimeTimeout())
>>>> ```
>>>>
>>>> Also works fine with my `StateUpdateTask`:
>>>> ```
>>>> .mapGroupsWithState(
>>>> new StateUpdateTask(myAcc),
>>>> Encoders.LONG(),
>>>> Encoders.bean(LeadingCharCount.class),
>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> public class StateUpdateTask
>>>> implements MapGroupsWithStateFunction<String, String, Long,
>>>> LeadingCharCount> {
>>>> private LongAccumulator myAccInTask;
>>>>
>>>> public StateUpdateTask(LongAccumulator acc) {
>>>> this.myAccInTask = acc;
>>>> }
>>>>
>>>> @Override
>>>> public LeadingCharCount call(String key, Iterator<String>
>>>> values, GroupState<Long> state) throws Exception {
>>>> myAccInTask.add(1);
>>>> <...>
>>>> state.update(newState);
>>>> return new LeadingCharCount(key, newState);
>>>> }
>>>> }
>>>> ```
>>>>
>>>> --
>>>> Cheers,
>>>> -z
>>>>
>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>> ZHANG Wei <[email protected]> wrote:
>>>>
>>>> > Yes, verified on the cluster with 5 executors.
>>>> >
>>>> > --
>>>> > Cheers,
>>>> > -z
>>>> >
>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>> > Something Something <[email protected]> wrote:
>>>> >
>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>> 'Local'
>>>> > > mode.
>>>> > >
>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[email protected]>
>>>> wrote:
>>>> > >
>>>> > > > I can't reproduce the issue with my simple code:
>>>> > > > ```scala
>>>> > > > spark.streams.addListener(new StreamingQueryListener {
>>>> > > > override def onQueryProgress(event:
>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>> > > > println(event.progress.id + " is on progress")
>>>> > > > println(s"My accu is ${myAcc.value} on query progress")
>>>> > > > }
>>>> > > > ...
>>>> > > > })
>>>> > > >
>>>> > > > def mappingFunc(key: Long, values: Iterator[String], state:
>>>> > > > GroupState[Long]): ... = {
>>>> > > > myAcc.add(1)
>>>> > > > println(s">>> key: $key => state: ${state}")
>>>> > > > ...
>>>> > > > }
>>>> > > >
>>>> > > > val wordCounts = words
>>>> > > > .groupByKey(v => ...)
>>>> > > > .mapGroupsWithState(timeoutConf =
>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>> > > >
>>>> > > > val query = wordCounts.writeStream
>>>> > > > .outputMode(OutputMode.Update)
>>>> > > > ...
>>>> > > > ```
>>>> > > >
>>>> > > > I'm wondering if there were any errors can be found from driver
>>>> logs? The
>>>> > > > micro-batch
>>>> > > > exceptions won't terminate the streaming job running.
>>>> > > >
>>>> > > > For the following code, we have to make sure that
>>>> `StateUpdateTask` is
>>>> > > > started:
>>>> > > > > .mapGroupsWithState(
>>>> > > > > new
>>>> > > >
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> > > > > appConfig, accumulators),
>>>> > > > > Encoders.bean(ModelStateInfo.class),
>>>> > > > > Encoders.bean(ModelUpdate.class),
>>>> > > > >
>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>> > > >
>>>> > > > --
>>>> > > > Cheers,
>>>> > > > -z
>>>> > > >
>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>> > > > Srinivas V <[email protected]> wrote:
>>>> > > >
>>>> > > > > Giving the code below:
>>>> > > > > //accumulators is a class level variable in driver.
>>>> > > > >
>>>> > > > > sparkSession.streams().addListener(new
>>>> StreamingQueryListener() {
>>>> > > > > @Override
>>>> > > > > public void onQueryStarted(QueryStartedEvent
>>>> queryStarted) {
>>>> > > > > logger.info("Query started: " +
>>>> queryStarted.id());
>>>> > > > > }
>>>> > > > > @Override
>>>> > > > > public void onQueryTerminated(QueryTerminatedEvent
>>>> > > > > queryTerminated) {
>>>> > > > > logger.info("Query terminated: " +
>>>> > > > queryTerminated.id());
>>>> > > > > }
>>>> > > > > @Override
>>>> > > > > public void onQueryProgress(QueryProgressEvent
>>>> > > > queryProgress) {
>>>> > > > >
>>>> > > > >
>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>> > > > > long eventsReceived = 0;
>>>> > > > > long eventsExpired = 0;
>>>> > > > > long eventSentSuccess = 0;
>>>> > > > > try {
>>>> > > > > eventsReceived =
>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>> > > > > eventsExpired =
>>>> > > > >
>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>> > > > > eventSentSuccess =
>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>> > > > > } catch (MissingKeyException e) {
>>>> > > > > logger.error("Accumulator key not found due
>>>> to
>>>> > > > > Exception {}", e.getMessage());
>>>> > > > > }
>>>> > > > > logger.info("Events Received:{}",
>>>> eventsReceived);
>>>> > > > > logger.info("Events State Expired:{}",
>>>> eventsExpired);
>>>> > > > > logger.info("Events Sent Success:{}",
>>>> eventSentSuccess);
>>>> > > > > logger.info("Query made progress - batchId: {}
>>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>>>> > > > > durationMs:{}" ,
>>>> > > > > queryProgress.progress().batchId(),
>>>> > > > > queryProgress.progress().numInputRows(),
>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>> > > > >
>>>> > > > queryProgress.progress().processedRowsPerSecond(),
>>>> > > > > queryProgress.progress().durationMs());
>>>> > > > >
>>>> > > > >
>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[email protected]>
>>>> wrote:
>>>> > > > >
>>>> > > > > > May I get how the accumulator is accessed in the method
>>>> > > > > > `onQueryProgress()`?
>>>> > > > > >
>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>> to verify
>>>> > > > that
>>>> > > > > > in cluster like this:
>>>> > > > > > ```
>>>> > > > > > // Add the following while loop before invoking
>>>> awaitTermination
>>>> > > > > > while (true) {
>>>> > > > > > println("My acc: " + myAcc.value)
>>>> > > > > > Thread.sleep(5 * 1000)
>>>> > > > > > }
>>>> > > > > >
>>>> > > > > > //query.awaitTermination()
>>>> > > > > > ```
>>>> > > > > >
>>>> > > > > > And the accumulator value updated can be found from driver
>>>> stdout.
>>>> > > > > >
>>>> > > > > > --
>>>> > > > > > Cheers,
>>>> > > > > > -z
>>>> > > > > >
>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>> > > > > > Srinivas V <[email protected]> wrote:
>>>> > > > > >
>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar
>>>> to what
>>>> > > > you
>>>> > > > > > do.
>>>> > > > > > > This is in Java
>>>> > > > > > > I do it this way:
>>>> > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>> > > > > > > .groupByKey(
>>>> > > > > > > (MapFunction<InputEventModel,
>>>> String>) event
>>>> > > > ->
>>>> > > > > > > event.getId(), Encoders.STRING())
>>>> > > > > > > .mapGroupsWithState(
>>>> > > > > > > new
>>>> > > > > > >
>>>> > > > > >
>>>> > > >
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> > > > > > > appConfig, accumulators),
>>>> > > > > > > Encoders.bean(ModelStateInfo.class),
>>>> > > > > > > Encoders.bean(ModelUpdate.class),
>>>> > > > > > >
>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>> > > > > > >
>>>> > > > > > > StateUpdateTask contains the update method.
>>>> > > > > > >
>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> > > > > > > [email protected]> wrote:
>>>> > > > > > >
>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>> > > > > > > >
>>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>>>> in which
>>>> > > > > > you've
>>>> > > > > > > > something like this?
>>>> > > > > > > >
>>>> > > > > > > >
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > > updateAcrossEvents
>>>> > > > > > > > )
>>>> > > > > > > >
>>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>>> We're
>>>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>>>> In other
>>>> > > > > > streaming applications it works as expected.
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>> [email protected]>
>>>> > > > > > wrote:
>>>> > > > > > > >
>>>> > > > > > > >> Yes, I am talking about Application specific
>>>> Accumulators.
>>>> > > > Actually I
>>>> > > > > > am
>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>> sent to
>>>> > > > > > Grafana. Not
>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>> “client” on
>>>> > > > a
>>>> > > > > > yarn
>>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>>> It should
>>>> > > > > > work the
>>>> > > > > > > >> same for cluster mode as well.
>>>> > > > > > > >> Create accumulators like this:
>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>> sparkContext.longAccumulator(name);
>>>> > > > > > > >>
>>>> > > > > > > >>
>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> > > > > > > >> [email protected]> wrote:
>>>> > > > > > > >>
>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>> getting
>>>> > > > > > computed in
>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>> > > > Accumulators.
>>>> > > > > > The
>>>> > > > > > > >>> other standard counters such as
>>>> > > > 'event.progress.inputRowsPerSecond'
>>>> > > > > > are
>>>> > > > > > > >>> getting populated correctly!
>>>> > > > > > > >>>
>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>> [email protected]>
>>>> > > > > > wrote:
>>>> > > > > > > >>>
>>>> > > > > > > >>>> Hello,
>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>> OnQueryProgress. I use
>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>> but not on
>>>> > > > > > cluster.
>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>> Graphana, the
>>>> > > > > > values
>>>> > > > > > > >>>> are coming there.
>>>> > > > > > > >>>>
>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> > > > > > > >>>> [email protected]> wrote:
>>>> > > > > > > >>>>
>>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>> [email protected]
>>>> > > > >
>>>> > > > > > wrote:
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>>>> OUT type
>>>> > > > > > should
>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>> implementation
>>>> > > > for
>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>> there any
>>>> > > > chance
>>>> > > > > > to replace
>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>> CollectionAccumulator[2] or
>>>> > > > > > LongAccumulator[3]
>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>> are able to
>>>> > > > > > work?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> ---
>>>> > > > > > > >>>>>> Cheers,
>>>> > > > > > > >>>>>> -z
>>>> > > > > > > >>>>>> [1]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0
>>>> > > > > > > >>>>>> [2]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0
>>>> > > > > > > >>>>>> [3]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> ________________________________________
>>>> > > > > > > >>>>>> From: Something Something <[email protected]>
>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>> > > > > > > >>>>>> To: spark-user
>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>>>> > > > Streaming
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>> this
>>>> > > > > > functionality
>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>>>> this but
>>>> > > > > > can't get it
>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>> Accumulator
>>>> > > > > > class that
>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>> track of
>>>> > > > one or
>>>> > > > > > more
>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>>> class:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>> "MyAccumulator")
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>>>> help
>>>> > > > would be
>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> > > > > > > >>>>>> [email protected]<mailto:
>>>> [email protected]>>
>>>> > > > wrote:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>> > > > Accumulators in
>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>>> when I
>>>> > > > try to
>>>> > > > > > print
>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > >>>>>> updateAcrossEvents
>>>> > > > > > > >>>>>> )
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>> 'updateAcrossEvents'.
>>>> > > > I've a
>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>> accumulators in
>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>> Accumulators
>>>> > > > are
>>>> > > > > > ALWAYS
>>>> > > > > > > >>>>>> ZERO!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When I added log statements in the
>>>> updateAcrossEvents, I
>>>> > > > could see
>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>> expected.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>>> In Local
>>>> > > > mode
>>>> > > > > > it
>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>> not getting
>>>> > > > > > distributed
>>>> > > > > > > >>>>>> correctly - or something like that!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>>> tell me to
>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>> This is a
>>>> > > > > > 'Stateful
>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>> 'registering' them
>>>> > > > in
>>>> > > > > > > >>>>>> SparkContext.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> >
>>>> > ---------------------------------------------------------------------
>>>> > To unsubscribe e-mail: [email protected]
>>>> >
>>>>
>>>