Thank you very much, Robert.
That makes a lot of sense.

On Mon, 22 Jan 2024 at 18:40, Robert Bradshaw via user <[email protected]>
wrote:

> This is probably because you're trying to index into the result of the
> GroupByKey in your AnalyzeSession as if it were a list. All that is
> promised is that it is an iterable. If it is large enough to merit
> splitting over multiple fetches, it won't be a list. (If you need to
> index, explicitly convert it into a list first, assuming it fits into
> memory. Otherwise just stick to re-iterating over it.)
>
> On Mon, Jan 22, 2024 at 7:21 AM Nimrod Shory <[email protected]> wrote:
> >
> > Hello everyone,
> > We encounter a weird issue while running a Python + Beam streaming job
> on Google Cloud Dataflow.
> > The job listens to a PubSub subscription of events, and my pipeline
> looks like this:
> >
> >> messages = (
> >>              p | "Read Topic" >>
> beam.io.ReadFromPubSub(subscription=options.subscription.get())
> >>                | "JSON" >> beam.Map(json.loads)
> >> )
> >> sessions = (
> >>             messages | "Add Keys" >> beam.WithKeys(lambda x: x["id"])
> >>             | "Session Window" >>
> beam.WindowInto(beam.window.Sessions(SESSION_TIMEOUT))
> >>             | beam.GroupByKey()
> >>             | "Analyze Session" >> beam.ParDo(AnalyzeSession())
> >> )
> >> sessions | beam.io.WriteToPubSub(topic=options.session_topic.get())
> >
> >
> >
> > After it runs for some time without any issues, I suddenly start getting
> the following error:
> >
> >> TypeError: '_ConcatSequence' object is not subscriptable
> >
> >
> > Instead of getting the expected key value pair I usually get:
> >>
> >> ('ID123', [{...},{...},{...}])
> >
> >
> > I start getting:
> >>
> >> ('ID234', <apache_beam.coders.coder_impl._ConcatSequence object at
> 0x7feca40d1d90>)
> >
> >
> > I suspect this happens due to a heavy load, but I could not find any
> information on why it could happen and how to mitigate it.
> >
> > Any help would be much appreciated!
> > Thanks.
>


-- 

*  Nimrod Shory*
  Chief Technology Officer
   +1 (650) 761-9944
   +972 (524) 703-024
  https://www.fugu-it.com

Reply via email to