Still gets stuck at the same place :/

On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
[email protected]> wrote:

>
>
> .triggering(
>         
> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>
>
>
> Try the trigger above
>
>
>
> *发件人:* Kevin Peterson [mailto:[email protected]]
> *发送时间:* 2017年6月15日 2:39
> *收件人:* [email protected]
> *主题:* Fwd: Creating side input map with global window
>
>
>
> Hi all,
>
>
>
> I am working on a (streaming) pipeline which reads elements from Pubsub,
> and schemas for those elements from a separate pubsub topic. I'd like to be
> able to create a side input map from the schema topic, and have that
> available to the main pipeline for parsing. Each message on the schema
> pubsub topic contains all schemas I care about, so for every new message, I
> want to generate a new map that will be available to the main pipeline
> (eventual consistency is fine). I don't have any windows or triggers on the
> main flow, since I really just want each element to be processed as it
> arrives, using whatever the latest schema available is.
>
>
>
> I am currently trying this with:
>
>
>
> PCollection<KV<String, String>> schema = pipeline
>         .apply("Read Schema",
>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>                 
> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>         .apply("Create Schema", ParDo.*of*(new 
> SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each 
> input
>
>
>
> PCollectionView<Map<String, String>> schemaView =
>         schema.apply(View.<String, String>*asMap*());
>
> pipeline
>         .apply("Read Elements", 
> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse 
> Elements",
>
>         ParDo.*of*(new DoFn<String, TableRow>() {
>             @ProcessElement
>             public void processElement(ProcessContext c) {
>
>                 String name = getNameFromElement(c.element());
>
>
>                 String schema = c.sideInput(schemaView).get(name);
>
>
>                 c.output(parse(c, schema));
>
>             }
>         }).withSideInputs(schemaView)).apply("Write to Table", 
> BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>
> When running this pipeline, the View.AsMap/View.
> CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
> stage never emits any elements, and so the pipeline never progresses. I
> can see the messages at the input stage, but nothing appears on the output.
>
>
>
> Any advice?
>
>
>
> Thanks,
>
> -Kevin
>
>
>
  • Fwd: Crea... Kevin Peterson
    • Re: ... Eugene Kirpichov
      • ... Kevin Peterson
    • 答复: ... 上海_中台研发部_数据平台部_基础数据部_唐觊隽
      • ... Kevin Peterson
        • ... Lukasz Cwik
          • ... Kevin Peterson
            • ... Lukasz Cwik

Reply via email to