Hi Kenneth Thanks for your reply. I realized that the issue happens if I perform the Count over the PCollection<Integer,MyClass>, but it works well if I map the collection to a primitive type, for example, PCollection<Integer,String>.
I don't think that it is a trigger issue, but something with my type. Something I tried it is implementing my own CombineFn and logging the rows. Only the first element is processed by the function when I use MyClass. I didn't tried with the direct runner I will tried that. The size of the collections is about 600 records. Thanks Regards On Thu, Jan 9, 2020 at 11:56 PM Kenneth Knowles <k...@apache.org> wrote: > Does it have the same behavior in the direct runner? What are the sizes of > intermediate PCollections? > > Kenn > > On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola <andresgaragi...@gmail.com> > wrote: > >> Hi all, >> >> I'm doing some tests with beam and apache flink. I'm running the code >> below: >> >> public static void main(String[] args) throws IOException { >> WorkflowStepOptions options = >> PipelineOptionsFactory.fromArgs(args).withValidation() >> .as(WorkflowStepOptions.class); >> logger.info("Options Kafka server {} input topic {} output topic {} >> window size {} group id {} step name {}", >> options.getKafkaBrokers(), options.getTopics(), >> options.getOutputTopic(), options.getWindowSize(), >> options.getGroupId(), workflowStepName); >> Pipeline p = Pipeline.create(options); >> >> CoderRegistry cr = p.getCoderRegistry(); >> cr.registerCoderForClass(MyClass.class, new MyClassCoder()); >> >> KafkaIO.Read<Integer, MyClass> kafkaIOReader = >> KafkaIO.<Integer,MyClass>read() >> .withBootstrapServers(options.getKafkaBrokers()) >> .withTopics(Arrays.asList(options.getTopics().split(","))) >> .withKeyDeserializer(IntegerDeserializer.class) >> .withValueDeserializer(MyClassEventDeserializer.class) >> //.withTimestampPolicyFactory(new >> MyClassTimestampPolicyFactory()) >> .withTimestampFn((KV<Integer,MyClass> event) -> >> event.getValue().getDate() == null ? >> Instant.now() : >> Instant.parse(event.getValue().getDate(), >> >> DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"))) >> .withConsumerConfigUpdates( >> ImmutableMap.of( >> "group.id", options.getGroupId(), >> "auto.offset.reset", "earliest") >> ); >> >> KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String, >> String>write() >> .withBootstrapServers(options.getKafkaBrokers()) >> .withTopic(options.getOutputTopic()) >> .withKeySerializer(StringSerializer.class) >> .withValueSerializer(StringSerializer.class); >> >> Window<KV<Integer, MyClass>> window = Window >> .<KV<Integer, >> MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))) >> .accumulatingFiredPanes() >> .withAllowedLateness(Duration.standardDays(365L)) >> .triggering(AfterWatermark.pastEndOfWindow() >> .withEarlyFirings( >> AfterProcessingTime >> .pastFirstElementInPane() >> >> .plusDelayOf(Duration.standardSeconds(1L))) >> .withLateFirings( >> AfterPane >> .elementCountAtLeast(1)) >> ); >> >> PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata()) >> .apply("Window", window) >> .apply(Combine.globally(Count.<KV<Integer, >> MyClass>>combineFn()).withoutDefaults()); >> >> toFormat >> .apply("FormatResults", >> MapElements >> >> .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings())) >> .via((Long count) -> >> { >> return KV.of("count", count.toString()); >> }) >> ) >> .apply(kafkaOutput); >> >> p.run(); >> } >> >> The idea is very simple, read some events from a Kafka topic, group them >> into a window, count them and put the result in another Kafka topic. >> >> I'm a little confuse regarding the result, the code above only produces >> one entry counting "1" element while I have a lot (around 500) events in >> the source topic. >> >> Do you have some suggestion to figure out the solution? Something I'm >> doing wrong here. >> >> Regards >> >