Can you enable DEBUG logs ? It'll be helpful to debug. -- Kamal
On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati <swati.gu...@anz.com> wrote: > Hello All, > > Any help on this would be appreciated. > There seems to be no error. Does it look like a version issue? > > I have updated my pom.xml with the below: > <dependency> > <groupId>org.springframework.kafka</groupId> > <artifactId>spring-kafka</artifactId> > <version>1.1.2.BUILD-SNAPSHOT</version> > </dependency> > > <dependency> > <groupId>org.apache.camel</groupId> > <artifactId>camel-kafka</artifactId> > <version>2.17.0</version> > </dependency> > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>0.10.1.0</version> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.11</artifactId> > <version>0.10.1.0</version> > </dependency> > > <dependency> > <groupId>org.apache.camel</groupId> > <artifactId>camel-core</artifactId> > <version>2.17.0</version> > </dependency> > > Thanks & Regards > Swati > > -----Original Message----- > From: Gupta, Swati [mailto:swati.gu...@anz.com] > Sent: Friday, 6 January 2017 4:01 PM > To: users@kafka.apache.org > Subject: RE: Apache Kafka integration using Apache Camel > > Yes, the kafka console consumer displays the message correctly. > I also tested the same with a Java application, it works fine. There seems > to be an issue with Camel route trying to consume. > > There is no error in the console. But, the logs show as below: > kafka.KafkaCamelTestConsumer > Connected to the target VM, address: '127.0.0.1:65007', transport: > 'socket' > PID_IS_UNDEFINED: INFO DefaultCamelContext - Apache Camel 2.17.0 > (CamelContext: camel-1) is starting > PID_IS_UNDEFINED: INFO ManagedManagementStrategy - JMX is enabled > PID_IS_UNDEFINED: INFO DefaultTypeConverter - Loaded 183 type converters > PID_IS_UNDEFINED: INFO DefaultRuntimeEndpointRegistry - Runtime endpoint > registry is in extended mode gathering usage statistics of all incoming and > outgoing endpoints (cache limit: 1000) > PID_IS_UNDEFINED: INFO DefaultCamelContext - AllowUseOriginalMessage is > enabled. If access to the original message is not needed, then its > recommended to turn this option off as it may improve performance. > PID_IS_UNDEFINED: INFO DefaultCamelContext - StreamCaching is not in use. > If using streams then its recommended to enable stream caching. See more > details at http://camel.apache.org/stream-caching.html > PID_IS_UNDEFINED: INFO KafkaConsumer - Starting Kafka consumer > PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [localhost:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 540000 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1024 > group.id = testing > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > receive.buffer.bytes = 32768 > reconnect.backoff.ms = 50 > request.timeout.ms = 40000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 30000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > > PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [localhost:9092] > check.crcs = true > client.id = consumer-1 > connections.max.idle.ms = 540000 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1024 > group.id = testing > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > receive.buffer.bytes = 32768 > reconnect.backoff.ms = 50 > request.timeout.ms = 40000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 30000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > > PID_IS_UNDEFINED: INFO AppInfoParser - Kafka version : 0.10.1.0 > PID_IS_UNDEFINED: INFO AppInfoParser - Kafka commitId : 3402a74efb23d1d4 > PID_IS_UNDEFINED: INFO DefaultCamelContext - Route: route1 started and > consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest& > consumersCount=1&groupId=testing&topic=test] > PID_IS_UNDEFINED: INFO DefaultCamelContext - Total 1 routes, of which 1 > are started. > PID_IS_UNDEFINED: INFO DefaultCamelContext > > -----Original Message----- > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > Sent: Friday, 6 January 2017 3:58 PM > To: users@kafka.apache.org > Subject: Re: Apache Kafka integration using Apache Camel > > More generally, do you have any log errors/messages or additional info? > It's tough to debug issues like this from 3rd party libraries if they > don't provide logs/exception info that indicates why processing a specific > message failed. > > -Ewen > > On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9...@gmail.com> > wrote: > > > Did you test that kafka console consumer is displaying the produced > > message? > > > > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> > wrote: > > > > > Hello All, > > > > > > > > > > > > I am trying to create a Consumer using Apache Camel for a topic in > > > Apache Kafka. > > > I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8. > > > I have attached a file, KafkaCamelTestConsumer.java which is a > > > standalone application trying to read from a topic “test1”created > > > in Apache Kafka I am producing messages from the console and also > > > was successful to produce messages using a Camel program in the > > > topic "test1", but not able to consume messages. Ideally, it should > > > get printed, but nothing seems to happen. The log says that the > > > route has started but does not process any message. > > > > > > Please help to confirm if there is anything wrong with the below > syntax: > > > > > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > > autoOffsetReset=earliest" > > > *+ > > > > > > *"&consumersCount=1&keyDeserializer=org.apache. > > kafka.common.serialization.StringDeserializer&" > > > *+ > > > *"valueDeserializer=org.apache.kafka.common.serialization. > > StringDeserializer" > > > *+ > > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000& > > autoCommitEnable=true"* > > > ).split() > > > .body() > > > .process(*new *Processor() { > > > @Override > > > *public void *process(Exchange exchange) > > > *throws *Exception { > > > String messageKey = *""*; > > > *if *(exchange.getIn() != *null*) { > > > Message message = exchange.getIn(); > > > Integer partitionId = (Integer) message > > > .getHeader(KafkaConstants.* > > PARTITION* > > > ); > > > String topicName = (String) message > > > .getHeader(KafkaConstants.* > TOPIC*); > > > *if *(message.getHeader( > > KafkaConstants.*KEY*) > > > != *null*) > > > messageKey = (String) message > > > .getHeader(KafkaConstants.* > > KEY*); > > > Object data = message.getBody(); > > > > > > > > > System.*out*.println( > > > *"topicName :: " *+ topicName + > > > *" partitionId :: " *+ partitionId > + > > > *" messageKey :: " *+ messageKey + > > > *" message :: " *+ data + > > *"**\n**"*); > > > } > > > } > > > }).to( > > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*); > > > } > > > }); > > > > > > > > > > > > I have also tried with the basic parameters as below and it still > > > fails > > to > > > read messages. > > > > > > from( > > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > > autoOffsetReset=earliest")* > > > > > > Any help on this will be greatly appreciated. > > > > > > Thanks in advance > > > > > > > > > > > > Thanks & Regards > > > > > > Swati > > > > > > ------------------------------ > > > This e-mail and any attachments to it (the "Communication") is, > > > unless otherwise stated, confidential, may contain copyright > > > material and is for the use only of the intended recipient. If you > > > receive the Communication > > in > > > error, please notify the sender immediately by return e-mail, delete > > > the Communication and the return e-mail, and do not read, copy, > > > retransmit or otherwise deal with it. Any views expressed in the > > > Communication are > > those > > > of the individual sender only, unless expressly stated to be those > > > of Australia and New Zealand Banking Group Limited ABN 11 005 357 > > > 522, or > > any > > > of its related entities including ANZ Bank New Zealand Limited > > > (together "ANZ"). ANZ does not accept liability in connection with > > > the integrity of or errors in the Communication, computer virus, > > > data corruption, interference or delay arising from or in respect of > the Communication. > > > > > > > > > > > This e-mail and any attachments to it (the "Communication") is, unless > otherwise stated, confidential, may contain copyright material and is for > the use only of the intended recipient. If you receive the Communication in > error, please notify the sender immediately by return e-mail, delete the > Communication and the return e-mail, and do not read, copy, retransmit or > otherwise deal with it. Any views expressed in the Communication are those > of the individual sender only, unless expressly stated to be those of > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any > of its related entities including ANZ Bank New Zealand Limited (together > "ANZ"). ANZ does not accept liability in connection with the integrity of > or errors in the Communication, computer virus, data corruption, > interference or delay arising from or in respect of the Communication. > > > This e-mail and any attachments to it (the "Communication") is, unless > otherwise stated, confidential, may contain copyright material and is for > the use only of the intended recipient. If you receive the Communication in > error, please notify the sender immediately by return e-mail, delete the > Communication and the return e-mail, and do not read, copy, retransmit or > otherwise deal with it. Any views expressed in the Communication are those > of the individual sender only, unless expressly stated to be those of > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any > of its related entities including ANZ Bank New Zealand Limited (together > "ANZ"). ANZ does not accept liability in connection with the integrity of > or errors in the Communication, computer virus, data corruption, > interference or delay arising from or in respect of the Communication. >