Ah sorry, I didn't see the other thread...

On Sun, 8 May 2016 at 07:05 Aljoscha Krettek <[email protected]> wrote:

> Hi,
> you should be able to set is using something like:
>
> kafkaIO.updateConsumerProperties(
>   ImmutableMap.<String, Object>of("group.id", <my group id>))
>
> Cheers,
> Aljoscha
>
> On Sun, 8 May 2016 at 06:35 amir bahmanyari <[email protected]> wrote:
>
>> Hi colleagues,
>> Hope you are having a great weekend.
>> I get a Kafka configuration exception, GroupId not being set,  when
>> trying to run my Beam app (Flink runner) in a Flink cluster.
>>
>>
>> I couldn't find a reference to a method that seta a GroupId  similar to
>> what we do for
>> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
>> The bottom of the stack trace is provide below.
>> How can I set a GroupId property for KafkaIO.read()?
>> Thanks for your help.
>>
>>         ... 25 more
>> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
>> required configuration "group.id <http://group.id>" which has no default
>> value.*
>>         at
>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
>>         at
>> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
>>         at org.apache.kafka.clients.consumer.*ConsumerConfig*
>> .<init>(ConsumerConfig.java:194)
>>         at org.apache.kafka.clients.consumer.*KafkaConsumer*
>> .<init>(KafkaConsumer.java:380)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
>>         at org.apache.beam.sdk.io.*kafka.KafkaIO*
>> $Read$1.apply(KafkaIO.java:339)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>>         at
>> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
>>         ... 27 more
>>
>>

Reply via email to