This is probably because you're trying to index into the result of the
GroupByKey in your AnalyzeSession as if it were a list. All that is
promised is that it is an iterable. If it is large enough to merit
splitting over multiple fetches, it won't be a list. (If you need to
index, explicitly convert it into a list first, assuming it fits into
memory. Otherwise just stick to re-iterating over it.)

On Mon, Jan 22, 2024 at 7:21 AM Nimrod Shory <[email protected]> wrote:
>
> Hello everyone,
> We encounter a weird issue while running a Python + Beam streaming job on 
> Google Cloud Dataflow.
> The job listens to a PubSub subscription of events, and my pipeline looks 
> like this:
>
>> messages = (
>>              p | "Read Topic" >> 
>> beam.io.ReadFromPubSub(subscription=options.subscription.get())
>>                | "JSON" >> beam.Map(json.loads)
>> )
>> sessions = (
>>             messages | "Add Keys" >> beam.WithKeys(lambda x: x["id"])
>>             | "Session Window" >> 
>> beam.WindowInto(beam.window.Sessions(SESSION_TIMEOUT))
>>             | beam.GroupByKey()
>>             | "Analyze Session" >> beam.ParDo(AnalyzeSession())
>> )
>> sessions | beam.io.WriteToPubSub(topic=options.session_topic.get())
>
>
>
> After it runs for some time without any issues, I suddenly start getting the 
> following error:
>
>> TypeError: '_ConcatSequence' object is not subscriptable
>
>
> Instead of getting the expected key value pair I usually get:
>>
>> ('ID123', [{...},{...},{...}])
>
>
> I start getting:
>>
>> ('ID234', <apache_beam.coders.coder_impl._ConcatSequence object at 
>> 0x7feca40d1d90>)
>
>
> I suspect this happens due to a heavy load, but I could not find any 
> information on why it could happen and how to mitigate it.
>
> Any help would be much appreciated!
> Thanks.

Reply via email to