Hi all, I'm doing some tests with beam and apache flink. I'm running the code below:
public static void main(String[] args) throws IOException { WorkflowStepOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WorkflowStepOptions.class); logger.info("Options Kafka server {} input topic {} output topic {} window size {} group id {} step name {}", options.getKafkaBrokers(), options.getTopics(), options.getOutputTopic(), options.getWindowSize(), options.getGroupId(), workflowStepName); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoderForClass(MyClass.class, new MyClassCoder()); KafkaIO.Read<Integer, MyClass> kafkaIOReader = KafkaIO.<Integer,MyClass>read() .withBootstrapServers(options.getKafkaBrokers()) .withTopics(Arrays.asList(options.getTopics().split(","))) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(MyClassEventDeserializer.class) //.withTimestampPolicyFactory(new MyClassTimestampPolicyFactory()) .withTimestampFn((KV<Integer,MyClass> event) -> event.getValue().getDate() == null ? Instant.now() : Instant.parse(event.getValue().getDate(), DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"))) .withConsumerConfigUpdates( ImmutableMap.of( "group.id", options.getGroupId(), "auto.offset.reset", "earliest") ); KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String, String>write() .withBootstrapServers(options.getKafkaBrokers()) .withTopic(options.getOutputTopic()) .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class); Window<KV<Integer, MyClass>> window = Window .<KV<Integer, MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))) .accumulatingFiredPanes() .withAllowedLateness(Duration.standardDays(365L)) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(1L))) .withLateFirings( AfterPane .elementCountAtLeast(1)) ); PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata()) .apply("Window", window) .apply(Combine.globally(Count.<KV<Integer, MyClass>>combineFn()).withoutDefaults()); toFormat .apply("FormatResults", MapElements .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings())) .via((Long count) -> { return KV.of("count", count.toString()); }) ) .apply(kafkaOutput); p.run(); } The idea is very simple, read some events from a Kafka topic, group them into a window, count them and put the result in another Kafka topic. I'm a little confuse regarding the result, the code above only produces one entry counting "1" element while I have a lot (around 500) events in the source topic. Do you have some suggestion to figure out the solution? Something I'm doing wrong here. Regards