Seems related to https://issues.apache.org/jira/browse/BEAM-1197?
On Wed, Jun 14, 2017 at 11:39 AM Kevin Peterson <[email protected]> wrote: > Hi all, > > I am working on a (streaming) pipeline which reads elements from Pubsub, > and schemas for those elements from a separate pubsub topic. I'd like to be > able to create a side input map from the schema topic, and have that > available to the main pipeline for parsing. Each message on the schema > pubsub topic contains all schemas I care about, so for every new message, I > want to generate a new map that will be available to the main pipeline > (eventual consistency is fine). I don't have any windows or triggers on the > main flow, since I really just want each element to be processed as it > arrives, using whatever the latest schema available is. > > I am currently trying this with: > > PCollection<KV<String, String>> schema = pipeline > .apply("Read Schema", > PubsubIO.readStrings().fromTopic("topic_for_schema")) > .apply(Window.<String>into(new GlobalWindows()).triggering( > > Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes()) > .apply("Create Schema", ParDo.of(new > SchemaDirectory.GenerateSchema())); // outputs around 100 elements for each > input > > > PCollectionView<Map<String, String>> schemaView = > schema.apply(View.<String, String>asMap()); > > pipeline > .apply("Read Elements", > PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse > Elements", > > ParDo.of(new DoFn<String, TableRow>() { > @ProcessElement > public void processElement(ProcessContext c) { > > String name = getNameFromElement(c.element()); > > > String schema = c.sideInput(schemaView).get(name); > > > c.output(parse(c, schema)); > > } > }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO. > writeTableRows()) // Other BQ options not copied. > > When running this pipeline, the > View.AsMap/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey > stage never emits any elements, and so the pipeline never progresses. I > can see the messages at the input stage, but nothing appears on the output. > > Any advice? > > Thanks, > -Kevin > >
