This is probably because you're trying to index into the result of the GroupByKey in your AnalyzeSession as if it were a list. All that is promised is that it is an iterable. If it is large enough to merit splitting over multiple fetches, it won't be a list. (If you need to index, explicitly convert it into a list first, assuming it fits into memory. Otherwise just stick to re-iterating over it.)
On Mon, Jan 22, 2024 at 7:21 AM Nimrod Shory <[email protected]> wrote: > > Hello everyone, > We encounter a weird issue while running a Python + Beam streaming job on > Google Cloud Dataflow. > The job listens to a PubSub subscription of events, and my pipeline looks > like this: > >> messages = ( >> p | "Read Topic" >> >> beam.io.ReadFromPubSub(subscription=options.subscription.get()) >> | "JSON" >> beam.Map(json.loads) >> ) >> sessions = ( >> messages | "Add Keys" >> beam.WithKeys(lambda x: x["id"]) >> | "Session Window" >> >> beam.WindowInto(beam.window.Sessions(SESSION_TIMEOUT)) >> | beam.GroupByKey() >> | "Analyze Session" >> beam.ParDo(AnalyzeSession()) >> ) >> sessions | beam.io.WriteToPubSub(topic=options.session_topic.get()) > > > > After it runs for some time without any issues, I suddenly start getting the > following error: > >> TypeError: '_ConcatSequence' object is not subscriptable > > > Instead of getting the expected key value pair I usually get: >> >> ('ID123', [{...},{...},{...}]) > > > I start getting: >> >> ('ID234', <apache_beam.coders.coder_impl._ConcatSequence object at >> 0x7feca40d1d90>) > > > I suspect this happens due to a heavy load, but I could not find any > information on why it could happen and how to mitigate it. > > Any help would be much appreciated! > Thanks.
