Hi,
you should be able to set is using something like:
kafkaIO.updateConsumerProperties(
ImmutableMap.<String, Object>of("group.id", <my group id>))
Cheers,
Aljoscha
On Sun, 8 May 2016 at 06:35 amir bahmanyari <[email protected]> wrote:
> Hi colleagues,
> Hope you are having a great weekend.
> I get a Kafka configuration exception, GroupId not being set, when trying
> to run my Beam app (Flink runner) in a Flink cluster.
>
>
> I couldn't find a reference to a method that seta a GroupId similar to
> what we do for
> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
> The bottom of the stack trace is provide below.
> How can I set a GroupId property for KafkaIO.read()?
> Thanks for your help.
>
> ... 25 more
> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
> required configuration "group.id <http://group.id>" which has no default
> value.*
> at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
> at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
> at org.apache.kafka.clients.consumer.*ConsumerConfig*
> .<init>(ConsumerConfig.java:194)
> at org.apache.kafka.clients.consumer.*KafkaConsumer*
> .<init>(KafkaConsumer.java:380)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
> at org.apache.beam.sdk.io.*kafka.KafkaIO*
> $Read$1.apply(KafkaIO.java:339)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
> at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
> at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
> ... 27 more
>
>