Hi Kenneth

Thanks for your reply. I realized that the issue happens if I perform the
Count over the PCollection<Integer,MyClass>, but it works well if I map the
collection to a primitive type, for example, PCollection<Integer,String>.

I don't think that it is a trigger issue, but something with my type.
Something I tried it is implementing my own CombineFn and logging the rows.
Only the first element is processed by the function when I use MyClass.

I didn't tried with the direct runner I will tried that. The size of the
collections is about 600 records.

Thanks
Regards

On Thu, Jan 9, 2020 at 11:56 PM Kenneth Knowles <k...@apache.org> wrote:

> Does it have the same behavior in the direct runner? What are the sizes of
> intermediate PCollections?
>
> Kenn
>
> On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola <andresgaragi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I'm doing some tests with beam and apache flink. I'm running the code
>> below:
>>
>>   public static void main(String[] args) throws IOException {
>>     WorkflowStepOptions options =
>> PipelineOptionsFactory.fromArgs(args).withValidation()
>>             .as(WorkflowStepOptions.class);
>>     logger.info("Options Kafka server {} input topic {} output topic {}
>> window size {} group id {} step name {}",
>>             options.getKafkaBrokers(), options.getTopics(),
>> options.getOutputTopic(), options.getWindowSize(),
>>             options.getGroupId(), workflowStepName);
>>     Pipeline p = Pipeline.create(options);
>>
>>     CoderRegistry cr = p.getCoderRegistry();
>>     cr.registerCoderForClass(MyClass.class, new MyClassCoder());
>>
>>     KafkaIO.Read<Integer, MyClass> kafkaIOReader =
>> KafkaIO.<Integer,MyClass>read()
>>             .withBootstrapServers(options.getKafkaBrokers())
>>             .withTopics(Arrays.asList(options.getTopics().split(",")))
>>             .withKeyDeserializer(IntegerDeserializer.class)
>>             .withValueDeserializer(MyClassEventDeserializer.class)
>>             //.withTimestampPolicyFactory(new
>> MyClassTimestampPolicyFactory())
>>             .withTimestampFn((KV<Integer,MyClass> event) ->
>>                     event.getValue().getDate() == null ?
>>                             Instant.now() :
>>                             Instant.parse(event.getValue().getDate(),
>>
>> DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ")))
>>             .withConsumerConfigUpdates(
>>                     ImmutableMap.of(
>>                             "group.id", options.getGroupId(),
>>                             "auto.offset.reset", "earliest")
>>             );
>>
>>     KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String,
>> String>write()
>>             .withBootstrapServers(options.getKafkaBrokers())
>>             .withTopic(options.getOutputTopic())
>>             .withKeySerializer(StringSerializer.class)
>>             .withValueSerializer(StringSerializer.class);
>>
>>     Window<KV<Integer, MyClass>> window = Window
>>             .<KV<Integer,
>> MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
>>             .accumulatingFiredPanes()
>>             .withAllowedLateness(Duration.standardDays(365L))
>>             .triggering(AfterWatermark.pastEndOfWindow()
>>                     .withEarlyFirings(
>>                             AfterProcessingTime
>>                                     .pastFirstElementInPane()
>>
>> .plusDelayOf(Duration.standardSeconds(1L)))
>>                     .withLateFirings(
>>                             AfterPane
>>                                     .elementCountAtLeast(1))
>>             );
>>
>>     PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata())
>>             .apply("Window", window)
>>             .apply(Combine.globally(Count.<KV<Integer,
>> MyClass>>combineFn()).withoutDefaults());
>>
>>     toFormat
>>             .apply("FormatResults",
>>                     MapElements
>>
>> .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings()))
>>                             .via((Long count) ->
>>                             {
>>                               return KV.of("count", count.toString());
>>                             })
>>             )
>>             .apply(kafkaOutput);
>>
>>     p.run();
>>   }
>>
>> The idea is very simple, read some events from a Kafka topic, group them
>> into a window, count them and put the result in another Kafka topic.
>>
>> I'm a little confuse regarding the result, the code above only produces
>> one entry counting "1" element while I have a lot (around 500) events in
>> the source topic.
>>
>> Do you have some suggestion to figure out the solution? Something I'm
>> doing wrong here.
>>
>> Regards
>>
>

Reply via email to