[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434927#comment-16434927 ] Veera edited comment on KAFKA-6052 at 4/12/18 4:58 AM: --- Hi Jason / Vahid Can you please update about the release date of 1.1.1 or 1.2.0 ? Regards Veeru was (Author: vmallavarapu): Hi Jason / Vahid Can you please update about the release date of 1.1.1 or 1.2.0 ? Regards Veeru > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430366#comment-16430366 ] Veera edited comment on KAFKA-6052 at 4/11/18 7:28 AM: --- Hi Jason Could you please confirm the release date of either 1.1.1. or 1.2.0 ? will wait if it happens in this month or else we take the fix and apply to 1.0.0 version. Thanks Veeru was (Author: vmallavarapu): Hi Jason Could you please confirm the release date of either 1.1.1. or 1.2.0 ? Many Thanks. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376515#comment-16376515 ] Jason Gustafson edited comment on KAFKA-6052 at 2/26/18 8:27 AM: - I had one admittedly far-fetched idea. The inter-broker send thread which is responsible for sending transaction markers does not block for connection establishment. The first time a request is sent to a node, the connection is initiated, but unless the connection is established extremely quickly, we'll just fail the request and depend on it being retried some time later after the connection has been established. The problem is that the request's completion handler is immediately invoked, which causes the request to be re-enqueued, and the selector to be woken up. I am wondering if the wakeup might be treated differently on windows: in particular, maybe it is shortcutting the select operation and not allowing the connection to establish. On the next iteration of the send thread's loop, the same thing would happen and we'd never be able to send the marker. An easy way to test this theory is to disable the wakeup and switch to a busier poll model. I've done this here: https://github.com/hachikuji/kafka/commit/7bdb499b83b74e4c263458d0f1f90d68536fe7db. If any users who are seeing this bug can test this out, I'd appreciate it. was (Author: hachikuji): I had one admittedly far-fetched idea. The inter-broker send thread which is responsible for sending transaction markers does not block for connection establishment. The first time a request is sent to a node, the connection is initiated, but unless the connection is established extremely quickly, we'll just fail the request and depend on it being retried some time later after the connection has been established. The problem is that the request's completion handler is immediately invoked, which causes the request to be re-enqueued, and the selector to be woken up. I am wondering if the wakeup might be treated differently on windows: in particular, maybe it is shortcutting the select operation and not allowing the connection to establish. On the next iteration of the send thread's loop, the same thing would happen and we'd never be able to send the marker. An easy way to test this theory is to disable the wakeup and switch to a busier poll model. I've done this here: [https://github.com/hachikuji/kafka/commit/7bdb499b83b74e4c263458d0f1f90d68536fe7db.] If any users who are seeing this bug can test this out, I'd appreciate it. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Priority: Major > Labels: windows > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done.");
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16204571#comment-16204571 ] Ansel Zandegran edited comment on KAFKA-6052 at 10/14/17 10:47 AM: --- Hi Apurva, Thanks for the comments. Yes they won't correlate as they are from different executions. Since you asked only the client logs on trace level, I added the others just in case. Regarding the transaction ID, we tried different approaches including creating producers with different transaction ID's for each producer. Now, since we figured that the problem is with the environment, it doesn't make sense. So, I have cleaned up those codes and created [^Separate_Logs.zip] that should correlate with each other. Hopefully this helps. Thanks! was (Author: zandegran): Hi Apurva, Thanks for the comments. Yes they won't correlate as they are from different executions. Since you asked only the client logs on trace level, I added the others just in case. Regarding the transaction ID, we tried different approaches including creating producers with different transaction ID's for each producer. Now, since we figured that the problem is with the environment, it doesn't make sense. So, I have cleaned up those codes and created separate logs that should correlate with each other. Hopefully this helps. Thanks! > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProp
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16203299#comment-16203299 ] Ansel Zandegran edited comment on KAFKA-6052 at 10/13/17 11:22 AM: --- Yes, the entire setup is running in windows not just the clients. I have attached the new TRACE level log. The log includes producer and consumer logs along with 3 brokers and zookeeper host. I have also added the TRACE level log for just the producer and consumer. Data log files are also attached. Thanks. was (Author: zandegran): Yes, the entire setup is running in windows not just the clients. I have attached the new TRACE level log. The log includes producer and consumer logs along with 3 brokers and zookeeper host. I have also added the TRACE level log for just the producer and consumer. Thanks. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran > Attachments: Prducer_Consumer.log, kafka-logs.zip, logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16203299#comment-16203299 ] Ansel Zandegran edited comment on KAFKA-6052 at 10/13/17 9:48 AM: -- Yes, the entire setup is running in windows not just the clients. I have attached the new TRACE level log. The log includes producer and consumer logs along with 3 brokers and zookeeper host. I have also added the TRACE level log for just the producer and consumer. Thanks. was (Author: zandegran): Yes, the entire setup is running in windows not just the clients. I have attached the new TRACE level log. The log includes producer and consumer logs along with 3 brokers and zookeeper host. Thanks. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran > Attachments: Prducer_Consumer.log, logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v6.4.14#64029)