How to keep consumers alive without polling new messages
Hi all, 0.10 consumers use poll() method to heartbeat Kafka brokers. Is there any way that I can make the consumer heartbeat but not poll any messages? The javadoc says, the recommended way is to move message processing to another thread. But when message processing keeps failing(because a third party service goes down for a while), the thread that actually processes messages could have too many messages accumulated. Maybe re-sending failed messages to another queue(IMQ) and re-processing them later is a good option? Thanks! -- Yifan
Fwd: Kafka Defunct Sockets
Hi, This is Magesh working as a Engineer at Visa INc. I'm relatively new to the Kafka ecosystem. We are using Kafka 0.9 and during our testing in our test environments, we have noticed that producer does retries with NETWORK_EXCEPTION. To debug the issue, i enabled TRACE logging and noticed that the nodes were added to the Disconnected list and hence they were being retried. >From the producer code, I noticed that the following would be the only scenario where a node is marked disconnected /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } Upon careful analysis, I didn't find any logs related to the exception block. So, the only possibility is that the sockets were becoming DeFunct. With netsat, I found that the sockets were getting dropped periodically. I wasn't sure if it was the Producer, Broker or the network layer thats causing this. Just wanted to check if there is any recommendation for this. We are using SASL. Thanks Magesh
Spark per topic num of partitions doubt
Hello, I have asked the question on stackoverflow as well here http://stackoverflow.com/questions/39737201/spark-kafka-per-topic-number-of-partitions-map-not-honored I am confused about the "per topic number of partitions" parameter when creating a inputDstream using KafkaUtils.createStream(...) method. I am pasting the question here, please help. >From [Spark Documentation][1] > parameter topicMap of KafkaUtils.createStream(...) method determines "per-topic number of Kafka partitions to consume" [Javadoc here][2] So, when I created a kafka topic with 3 partitions and started a spark receiver as MaptopicMap = new HashMap<>(); topicMap.put(topic, 1); JavaPairReceiverInputDStream inputDStream = KafkaUtils.createStream(javaStreamingContext, zookeeperQuorum, groupId, topicMap); I expected this receiver to receive messages from ONLY one partition of the 3 partitions that I created. However, when I check the offset checker, I see the following: Pid Offset logSize Lag Owner 0 9 9 0 none 1 11 11 0 none 2 7 7 0 none I expected this code to receive messages from one partition and then I thought I needed to start more receivers (one per partition) as given in the [documentation here][3] to cover all Kafka topic partitions. int numStreams = 3; List > kafkaStreams = new ArrayList<>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...)); } JavaPairDStream unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); So, my question is can one receiver receive messages from all partitions? If so, what in the world does the topicMap(topic -> numPartitions) mean? [1]: http://spark.apache.org/docs/latest/streaming-kafka-integration.html [2]: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html [3]: http://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
Re: Schema Registry in Staging Environment
Lawrence, There are two common ways to approach registration of schemas. The first is to just rely on auto-registration that the serializers do (I'm assuming you're using the Java clients & serializers here, or an equivalent implementation in another language). In this case you can generally just allow the registration to happen as the updated application hits each stage. If it gets rejected in staging, it won't ever make it to prod. If you discover an issue in staging, the most common case is that it isn't a problem with the schema but rather with the surrounding code, in which case a subsequent deploy will generally be able to succeed. Note that unrelated changes can continue even if you end up not deploying the change to production since they will not be affected by the newly registered schema. The second way is to build the registration into your deployment pipeline, so you may perform the registration before ever deploying an instance of the app. This generally requires more coordination between your deployment and apps (since deployment needs to know what schemas exist and how to register them in the appropriate environment), but allows you to catch errors a bit earlier (and may allow you to restrict writes to the schema registry to the machines performing deployment, which some shops may want to do). One of the goals of the schema registry is to help decouple developers within your organization, so it is absolutely common for developers to simply create new schemas. In fact, they may just build the entire process into their app. For example, while not released yet, we have a maven plugin that you can use to integrate interactions with the schema registry into your Maven/Java application development process: https://github.com/confluentinc/schema-registry/tree/master/maven-plugin -Ewen On Mon, Sep 26, 2016 at 1:33 PM, Lawrence Weikumwrote: > Hello, > > Has anyone used Confluent’s Schema Registry? If so, I’m curious to hear > about best practices for using it in a staging environment. > > Do users typically copy schemas over to the staging environment from > production? Are developers allowed to create new schemas in the staging > environment? > > Thanks! > > Lawrence Weikum > > -- Thanks, Ewen
Kafka consumer receiving same message multiple times
Hi, I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message. I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data. while(true){try{if(consumerDao.canPollTopic()){ConsumerRecordsrecords =consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));for(ConsumerRecordrecord :records){if(record.value()!=null){TextAnalysisRequesttextAnalysisObj =record.value();if(textAnalysisObj!=null){PostProcessRequestreq =newPostProcessRequest();req.setRequest(this.getRequest(textAnalysisObj));PreProcessorUtil.submitPostProcessRequest(req,config);else{Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));}}catch(Exceptionex){LOGGER.error("Error in Full Consumer group worker",ex); }} Here's the kafka consumer configuration parameters I'm setting. Rest are default values. consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=18 consumer.poll.records=2147483647 consumer.request.timeout=181000 Here's the complete consumer config: metric.reporters =metadata.max.age.ms =30partition.assignment.strategy =[org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms =50sasl.kerberos.ticket.renew.window.factor =0.8max.partition.fetch.bytes =1048576bootstrap.servers =[kafkahost1:9092,kafkahost2:9092]ssl.keystore.type =JKS enable.auto.commit =truesasl.mechanism =GSSAPI interceptor.classes =nullexclude.internal.topics =truessl.truststore.password =nullclient.id =ssl.endpoint.identification.algorithm =nullmax.poll.records =2147483647check.crcs =truerequest.timeout.ms =181000heartbeat.interval.ms =3000auto.commit.interval.ms =1000receive.buffer.bytes =65536ssl.truststore.type =JKS ssl.truststore.location =nullssl.keystore.password =nullfetch.min.bytes =1send.buffer.bytes =131072value.deserializer =classcom.test.preprocessor.consumer.serializer.KryoObjectSerializergroup.id =full_group retry.backoff.ms =100sasl.kerberos.kinit.cmd =/usr/bin/kinit sasl.kerberos.service.name =nullsasl.kerberos.ticket.renew.jitter =0.05ssl.trustmanager.algorithm =PKIX ssl.key.password =nullfetch.max.wait.ms =500sasl.kerberos.min.time.before.relogin =6connections.max.idle.ms =54session.timeout.ms =18metrics.num.samples =2key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializerssl.protocol =TLS ssl.provider =nullssl.enabled.protocols =[TLSv1.2,TLSv1.1,TLSv1]ssl.keystore.location =nullssl.cipher.suites =nullsecurity.protocol =PLAINTEXT ssl.keymanager.algorithm =SunX509metrics.sample.window.ms =3auto.offset.reset =latest My sample kafka queue is having 8 partitions with 2 replication factor. The log retention period in server.properties is setup as 168 hours. log.retention.hours=168log.roll.hours=168 Not sure what I'm missing here. Any pointers will be appreciated.
Re: micro-batching in kafka streams
One more thing: Guozhang pointed me towards this sample for micro-batching: https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java This is a good example and successfully got it adapted for my user case. BUT the main problem is that even if my use case deals with writing of hourly windows of data and hence the data is already in a rocksdb file but I need to create a duplicate of the same file just to be able to periodically do range scans on it and write to the external database. I did try to see if I could get StateStore to read the same rocksdb file used by the aggregateByKey which is happening before this step but it complained about not being able to lock the file. Would be great to be able to share the same underlying file between aggregateByKey (or any other such KTable-producing operation) and such periodic triggers. Ara. On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi> wrote: Hi, So, here’s the situation: - for classic batching of writes to external systems, right now I simply hack it. This specific case is writing of records to Accmumlo database, and I simply use the batch writer to batch writes, and it flushes every second or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit too. This is good enough for me, but obviously it’s not perfect. I wish Kafka Streams had some sort of a trigger (based on x number of records processed, or y window of time passed). Which brings me to the next use case. - I have some logic for calculating hourly statistics. So I’m dealing with Windowed data already. These stats then need to be written to an external database for use by user facing systems. Obviously I need to write the final result for each hourly window after we’re past that window of time (or I can write as often as it gets updated but the problem is that the external database is not as fast as Kafka). I do understand that I need to take into account the fact that events may arrive out of order and there may be some records arriving a little bit after I’ve considered the previous window over and have moved to the next one. I’d like to have some sort of an hourly trigger (not just pure x milliseconds trigger, but also support for cron style timing) and then also have the option to update the stats I’ve already written for a window a set amount of time after the trigger got triggered so that I can deal with events which arrive after the write for that window. And then there’s a cut-off point after which updating the stats for a very old window is just not worth it. Something like this DSL: kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* write */ } ); The tricky part is reconciling event source time and event processing time. Clearly this trigger is in the event processing time whereas the data is in the event source time most probably. Something like that :) Ara. On Sep 26, 2016, at 1:59 AM, Michael Noll > wrote: Ara, may I ask why you need to use micro-batching in the first place? Reason why I am asking: Typically, when people talk about micro-batching, they are refer to the way some originally batch-based stream processing tools "bolt on" real-time processing by making their batch sizes really small. Here, micro-batching belongs to the realm of the inner workings of the stream processing tool. Orthogonally to that, you have features/operations such as windowing, triggers, etc. that -- unlike micro-batching -- allow you as the user of the stream processing tool to define which exact computation logic you need. Whether or not, say, windowing is or is not computed via micro-batching behind the scenes should (at least in an ideal world) be of no concern to the user. -Michael On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi > wrote: Hi, What’s the best way to do micro-batching in Kafka Streams? Any plans for a built-in mechanism? Perhaps StateStore could act as the buffer? What exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used anywhere? http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Kafka consumer picking up the same message multiple times
Hi, I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message. I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. Here's a code snippet for polling data. while (true) { try{ if(consumerDao.canPollTopic()){ ConsumerRecordsrecords = consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT)); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ PostProcessRequest req = new PostProcessRequest(); req.setRequest(this.getRequest(textAnalysisObj)); PreProcessorUtil.submitPostProcessRequest(req, config); } } } }else{ Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP)); } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } } Here's the kafka consumer configuration parameters I'm setting. Rest are default values. consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=18 consumer.poll.records=2147483647 consumer.request.timeout=181000 Here's the complete consumer config: metric.reporters = [] metadata.max.age.ms = 30 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 181000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 session.timeout.ms = 18 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 auto.offset.reset = latest My sample kafka queue is having 8 partitions with 2 replication factor. The log retention period in server.properties is setup as 168 hours. log.retention.hours=168 log.roll.hours=168 Not sure what I'm missing here. Any pointers will be appreciated. -Thanks, Shamik
Re: Exception while deserializing in kafka streams
Ah, that was it. I was passing the same Serde while creating the topology. It works after I removed it. Thanks! Walter On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wangwrote: > Hi Walter, > > One thing I can think of is that, if you pass the serde object as part of > your topology definition, instead of passing the serde class in the config, > then these serde objects will not be auto configured and hence for your > case the schema registry client will not be constructed and initialized. > > https://issues.apache.org/jira/browse/KAFKA-3729 > > So in case your application's topology does overwrite serdes with direct > serde object passing, you need to configure them manually for now. > > > Guozhang > > On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang wrote: > > > Hi Walter, > > > > I downloaded the 0.10.0 jar and verified that the configure() function is > > auto-triggered when you get the serde classes from `context.keySerde / > > valueSerde`, which is auto-triggered if you use the DSL. And your Scala > > code is the same as to our examples code: > > > > https://github.com/confluentinc/examples/blob/ > > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/ > > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java > > > > > > Which demo example were you running? And are there any other jars > > co-located with the 0.10.0.0 jar that could cause another class be > loaded? > > > > > > Guozhang > > > > > > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff > > wrote: > > > >> Guozhang, > >> > >> I tried your suggestion. Below is the log from Serde, Serializer > >> & Deserializer. > >> Confirmed that KafkaAvroDeserializer.configure does get invoked. > >> > >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In > >> > configure {num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem > >> aRegistry: > >> > In configure{num.standby.replicas=1, replication.factor=3, > >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092, > >> > schema.registry.url=http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > Line 385: 16/09/22 15:28:46 WARN > >> > GenericAvroDeserializerWithSchemaRegistry: In > >> > configure{num.standby.replicas=1, replication.factor=3, > >> commit.interval.ms=125000, > >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url= > >> > http://10.200.184.41:8081, > >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde, > >> > zookeeper.connect=10.200.184.26:2181, value.serde=class > >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry, > >> > auto.offset.reset=earliest, num.stream.threads=1, client.id > >> =KStreams-Test, > >> > application.id=testing-app} > >> > >> > >> Still the same exception > >> > >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete > >> > [StreamThread-1] > >> > Exception in thread "StreamThread-1" > >> > org.apache.kafka.common.errors.SerializationException: Error > >> deserializing > >> > Avro message for id 7 > >> > Caused by: java.lang.NullPointerException > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .deserialize(AbstractKafkaAvroDeserializer.java:120) > >> > at > >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer > >> .deserialize(AbstractKafkaAvroDeserializer.java:92) > >> > >> > >> > >> Walter > >> > >> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang > >> wrote: > >> > >> > Hello Walter, > >> > > >> > The WARN log entry should not be the cause of this issue. > >> > > >> > I double checked the 0.10.0.0 release and this issue should not really > >> > happen, so your observation is a bit weird to me. Could your add a log > >> > entry in the `configure` function which constructs the registry client > >> to > >> > make sure it is indeed triggered when the streams app start up? > >> > > >> > > >> > Guozhang > >> > > >> > > >> > > >> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff < > walter.rak...@gmail.com > >> > > >> > wrote: > >> > > >> > > Guozhang, > >> > > > >> > > Any clues on this one? > >> > > > >> > > Walter > >> > > > >> > > On
RE: SendFailedException
sometimes engineers run scripts out-of-order we will need the exact steps you are following:Are you running thru Virtualbox-Vagrant in which case we will need to see Vagrantfile.local file?https://www.codatlas.com/github.com/apache/kafka/trunk/vagrant/system-test-Vagrantfile.local We will also need the exact version you are implementing for Kafka-Producer ? We will also need the exact version you are implementing for Kafka-Consumer? we will need to view the exact configurations you are implementing for: .\config\connect-distributed.propertieshttps://www.codatlas.com/github.com/apache/kafka/HEAD/config/connect-distributed.properties -OR if running standalone-.\config\connect-standalone.propertieshttps://www.codatlas.com/github.com/apache/kafka/trunk/config/connect-standalone.properties .\config\consumer.propertieshttp://kafka.apache.org/082/documentation.html#consumerconfigs .\config\producer.propertieshttp://kafka.apache.org/082/documentation.html#producerconfigs .\config\server.propertieshttps://www.codatlas.com/github.com/apache/kafka/trunk/config/server.properties if you are implementing a distributed networking system such as Zookeeper?https://www.codatlas.com/github.com/apache/kafka/trunk/config/zookeeper.properties which version JVM are you implementing?are you building BEFORE running kafka in which case which version Scala Compiler? Thanks Martin Gainty Enterprise *Contractor* __ > From: achintya_gh...@comcast.com > To: users@kafka.apache.org > CC: d...@kafka.apache.org > Subject: SendFailedException > Date: Mon, 26 Sep 2016 20:08:22 + > > Hi there, > > Can anyone please help us as we are getting the SendFailedException when > Kafka consumer is starting and not able to consume any message? > > Thanks > Achintya
Consumer offsets reset for _all_ topics after increasing partitions for one topic
I increased partitions for one existing topic (2->10), but was surprised to see that it entirely reset the committed offsets of my consumer group. All topics & partitions were reset to the earliest offset available, and the consumer read everything again. Documentation doesn't mention anything like this. Is this how it's supposed to work, or a bug? I would've expected the consumer offsets to not decrease at all, especially for the topics that I didn't even touch. For the altered topic I would've expected that consuming the previously existing partitions 0 and 1 would've continued from the position where they were, and naturally starting to read the new added partitions from 0. I added partitions according to the "Modifying topics" section of Kafka 0.10.0 Documentation: "To add partitions you can do > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic altered_topic --partitions 10 " Previously this topic had 2 partitions. For the consumer I'm using: kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter() And version is: org.apache.kafka kafka_2.11 0.10.0.1 Kafka cluster itself is kafka_2.11-0.10.0.1.
Re: producer can't push msg sometimes with 1 broker recoved
Aggie, I'm not able to re-produce your behavior in 0.10.0.1. > I did more testing and find the rule (Topic is created with "--replication-factor 2 --partitions 1" in following case): > node 1 node 2 > down(lead) down (replica) > down(replica) up (lead) producer send fail !!! When node 2 is up, after the metadata update producer able to connect and send messages to it. Logs: [2016-09-27T15:18:17,907] NetworkClient: handleDisconnections(): Node 1 disconnected. [2016-09-27T15:18:18,007] NetworkClient: initiateConnect(): Initiating connection to node 1 at localhost:9093. [2016-09-27T15:18:18,008] Selector: pollSelectionKeys(): Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_45] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_45] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:309) [kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:283) [kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) [kafka-clients-0.10.0.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) [kafka-clients-0.10.0.1.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45] [2016-09-27T15:18:18,008] NetworkClient: handleDisconnections(): Node 1 disconnected. [2016-09-27T15:18:18,043] NetworkClient: maybeUpdate(): Sending metadata request {topics=[hello]} to node 0 [2016-09-27T15:18:18,052] Metadata: update(): Updated cluster metadata version 4 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack: null)], partitions = [Partition(topic = hello, partition = 0, leader = none, replicas = [0,1,], isr = []]) [2016-09-27T15:18:19,053] NetworkClient: maybeUpdate(): Sending metadata request {topics=[hello]} to node 0 [2016-09-27T15:18:19,056] Metadata: update(): Updated cluster metadata version 5 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack: null)], partitions = [Partition(topic = hello, partition = 0, leader = 0, replicas = [0,1,], isr = [0,]]) [2016-09-27T15:18:19,081] KafkaProducer: main(): Batch : 4 sent [2016-09-27T15:18:19,182] KafkaProducer: main(): Batch : 5, Sending the record with key : 0 - Kamal On Mon, Sep 26, 2016 at 8:53 AM, FEI Aggiewrote: > Kamal, > Thanks for your response. I tried testing with metadata.max.age.ms > reduced to 10s, but the behavior not changed, and producer still can't find > the live broker. > > I did more testing and find the rule (Topic is created with > "--replication-factor 2 --partitions 1" in following case): > node 1 node 2 > down(lead) down (replica) > down(replica) up (lead) producer send fail !!! > > > down(lead) down (replica) > up (lead) down (replica) producer send ok !!! > > If the only node with original lead partition up, everything is fine. > If the only node with original replica partition up, producer can't > connect to broker alive (always try to connect to the original lead broker, > node 1 in my case). > > Kafka can't recover for this situation? Anyone has clue for this? > > Thanks! > Aggie > -Original Message- > From: Kamal C [mailto:kamaltar...@gmail.com] > Sent: Saturday, September 24, 2016 1:37 PM > To: users@kafka.apache.org > Subject: Re: producer can't push msg sometimes with 1 broker recoved > > Reduce the metadata refresh interval 'metadata.max.age.ms' from 5 min to > your desired time interval. > This may reduce the time window of non-availability broker. > > -- Kamal >
Re: Handling out-of-order messaging w/ Kafka Streams
Hi Mathieu, If the messages are sent asynchronously, then what you're observing is indeed right. There is no guarantee that the first will arrive at the destination first. Perhaps you can try sending them synchronously (i.e., wait until the first one is received, before sending the second). That might slow down your pipeline, but perhaps that's acceptable? Eno > On 27 Sep 2016, at 04:26, Mathieu Fenniak> wrote: > > Hey Apache Users, > > I'm working on a web application that has a web service component, and a > background processor component. Both applications will send messages to > the same Kafka topic as an object is manipulated. > > In some cases, a web service call in the service component will send a > message to Kafka saying key K has state S1, then trigger a background > operation, and then the background component will send a message to Kafka > saying key K has state S2. However, I'm finding that the topic ends up > occasionally having a message K/S2 followed by K/S1, rather than the other > way around. As both producers in the web service call and the background > processor send messages asynchronously with librdkafka, I believe this is a > relatively simple race condition where messages just aren't coming in like > I'd like them to. > > In a consuming Kafka Streams application, I'd be creating a KTable of this > topic. What approaches can I take to ensure the the KTable will end up > with K/S2 as the state for K, rather than the stale-er K/S1? > > Would KS reorder messages if they had ordered & coordinated timestamps? If > so, how much leeway would it have for S2 being delivered before S1? (I > believe librdkafka 0.9.1 doesn't support sending create-time in messages, > which makes this is a bit more painful.) > > Any other approaches that are worth exploring? > > Thanks for any thoughts, > > Mathieu
intilisation of the contexte
Hi, i would like to know how in kafka streams the context is initilised. Because I 've a problem with one kafka-stream apllication. every time i call it i notice that the context is initilaised more than once or is created more than once which is abnormal and this cause a bug in the system. Hamza