.triggering(
        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))
        .discardingFiredPanes().withAllowedLateness(Duration.ZERO));

Try the trigger above

发件人: Kevin Peterson [mailto:[email protected]]
发送时间: 2017年6月15日 2:39
收件人: [email protected]
主题: Fwd: Creating side input map with global window

Hi all,

I am working on a (streaming) pipeline which reads elements from Pubsub, and 
schemas for those elements from a separate pubsub topic. I'd like to be able to 
create a side input map from the schema topic, and have that available to the 
main pipeline for parsing. Each message on the schema pubsub topic contains all 
schemas I care about, so for every new message, I want to generate a new map 
that will be available to the main pipeline (eventual consistency is fine). I 
don't have any windows or triggers on the main flow, since I really just want 
each element to be processed as it arrives, using whatever the latest schema 
available is.

I am currently trying this with:


PCollection<KV<String, String>> schema = pipeline
        .apply("Read Schema",
                PubsubIO.readStrings().fromTopic("topic_for_schema"))
        .apply(Window.<String>into(new GlobalWindows()).triggering(
                
Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
        .apply("Create Schema", ParDo.of(new 
SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each 
input



PCollectionView<Map<String, String>> schemaView =
        schema.apply(View.<String, String>asMap());

pipeline
        .apply("Read Elements", 
PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse Elements",

        ParDo.of(new DoFn<String, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) {

                String name = getNameFromElement(c.element());

                String schema = c.sideInput(schemaView).get(name);

                c.output(parse(c, schema));

            }
        }).withSideInputs(schemaView)).apply("Write to Table", 
BigQueryIO.writeTableRows()) // Other BQ options not copied.
When running this pipeline, the 
View.AsMap/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
 stage never emits any elements, and so the pipeline never progresses. I can 
see the messages at the input stage, but nothing appears on the output.

Any advice?

Thanks,
-Kevin

  • Fwd: Crea... Kevin Peterson
    • Re: ... Eugene Kirpichov
      • ... Kevin Peterson
    • 答复: ... 上海_中台研发部_数据平台部_基础数据部_唐觊隽
      • ... Kevin Peterson
        • ... Lukasz Cwik
          • ... Kevin Peterson
            • ... Lukasz Cwik

Reply via email to