Yes I agree that blacklisting structure can be put in the user-defined
state but still the state would remain open for a long time, right? Am I
misunderstanding something?

I like the idea of blacklisting in a "Broadcast" variable but I can't
figure out how to use the "Broadcast" variable in the 'mapGroupWithState'
function. For example, I've this code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
  updateAcrossEvents
)

and updateAcrossEvents is defined as:

def updateAcrossEvents(tuple5: (String, String, String, String,
String), inputs: Iterator[MyObject], oldState: GroupState[MyState])


How do I pass a "Broadcast Variable" into the 'updateAcrossEvents'
method? Please advise. Thanks.



On Mon, Nov 16, 2020 at 3:40 AM Yuanjian Li <xyliyuanj...@gmail.com> wrote:

> If you use the `flatMap/mapGroupsWithState` API for a "stateful" SS job,
> the blacklisting structure can be put into the user-defined state.
> To use a 3rd-party cache should also be a good choice.
>
> Eric Beabes <mailinglist...@gmail.com> 于2020年11月11日周三 上午6:54写道:
>
>> Currently we’ve a “Stateful” Spark Structured Streaming job that computes
>> aggregates for each ID. I need to implement a new requirement which says
>> that if the no. of incoming messages for a particular ID exceeds a certain
>> value then add this ID to a blacklist & remove the state for it. Going
>> forward for any ID that’s blacklisted we will not create a state for it.
>> The message will simply get filtered out if the ID is blacklisted.
>>
>> What’s the best way to implement this in Spark Structured Streaming?
>> Essentially what we need to do is create a Distributed HashSet that gets
>> updated intermittently & make this HashSet available to all Executors so
>> that they can filter out unwanted messages.
>>
>> Any pointers would be greatly appreciated. Is the only option to use a
>> 3rdparty Distributed Cache tool such as EhCache, Redis etc?
>>
>>
>>
>>

Reply via email to