Yep, that is the basic pattern I am looking for. A couple of comments: 1. I can "poke" the side input via pubsub when I want to update the input, so I don't need any other mechanism to force reload. 2. I don't need to reprocess elements when I get a new side input. As long as the side input is updated eventually (within reason!), that is fine.
Any suggestions on using existing mechanisms to make this work? On a side note, the reason I'm looking into side inputs: I tried just having the DoFn's initialize the schema map on the first element, but that leads to the problem of element 1 taking a while -> Dataflow starts more threads -> those also take a while since they also need a schema map -> more threads -> more maps created -> OOM! On Wed, Jun 14, 2017 at 12:57 PM, Eugene Kirpichov <[email protected]> wrote: > Seems related to https://issues.apache.org/jira/browse/BEAM-1197? > > On Wed, Jun 14, 2017 at 11:39 AM Kevin Peterson <[email protected]> > wrote: > >> Hi all, >> >> I am working on a (streaming) pipeline which reads elements from Pubsub, >> and schemas for those elements from a separate pubsub topic. I'd like to be >> able to create a side input map from the schema topic, and have that >> available to the main pipeline for parsing. Each message on the schema >> pubsub topic contains all schemas I care about, so for every new message, I >> want to generate a new map that will be available to the main pipeline >> (eventual consistency is fine). I don't have any windows or triggers on the >> main flow, since I really just want each element to be processed as it >> arrives, using whatever the latest schema available is. >> >> I am currently trying this with: >> >> PCollection<KV<String, String>> schema = pipeline >> .apply("Read Schema", >> PubsubIO.readStrings().fromTopic("topic_for_schema")) >> .apply(Window.<String>into(new GlobalWindows()).triggering( >> >> Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes()) >> .apply("Create Schema", ParDo.of(new >> SchemaDirectory.GenerateSchema())); // outputs around 100 elements for each >> input >> >> >> PCollectionView<Map<String, String>> schemaView = >> schema.apply(View.<String, String>asMap()); >> >> pipeline >> .apply("Read Elements", >> PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse >> Elements", >> >> ParDo.of(new DoFn<String, TableRow>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> >> String name = getNameFromElement(c.element()); >> >> >> String schema = c.sideInput(schemaView).get(name); >> >> >> c.output(parse(c, schema)); >> >> } >> }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO. >> writeTableRows()) // Other BQ options not copied. >> >> When running this pipeline, the View.AsMap/View. >> CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey >> stage never emits any elements, and so the pipeline never progresses. I >> can see the messages at the input stage, but nothing appears on the output. >> >> Any advice? >> >> Thanks, >> -Kevin >> >>
