[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290024#comment-16290024 ] Narendra Kumar commented on KAFKA-6339: --- Hi Dhruv, Can you please enable debug logging and provide some more amount of logs? > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > {code:java} > String ZKHOST = "127.0.0.1"; > String BROKERHOST = "127.0.0.1"; > String BROKERPORT = "9093"; > String TOPIC = "test1"; > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer > Properties producerProps = new Properties(); > producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > producerProps.setProperty("key.serializer", > "org.apache.kafka.common.serialization.IntegerSerializer"); > producerProps.setProperty("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > KafkaProducerproducer = new > KafkaProducer (producerProps); > // setup consumer > Properties consumerProps = new Properties(); > consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > consumerProps.setProperty("group.id", "group0"); > consumerProps.setProperty("client.id", "consumer0"); > consumerProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.IntegerDeserializer"); > consumerProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > consumerProps.put("auto.offset.reset", "earliest"); // to make > sure the consumer starts from the beginning of > > // the topic > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProps); > consumer.subscribe(Arrays.asList(TOPIC)); > // send message > ProducerRecord data = new > ProducerRecord<>(TOPIC, 42, > > "test-message".getBytes(StandardCharsets.UTF_8)); > Future record = producer.send(data); > RecordMetadata metadata = record.get(); > // starting consumer > ConsumerRecords records = consumer.poll(3000); > assertEquals(1, records.count()); > Iterator > recordIterator = > records.iterator(); > ConsumerRecord consumedRecord = > recordIterator.next(); > System.out.printf("offset = %d, key = %s, value = %s", > consumedRecord.offset(), consumedRecord.key(), > consumedRecord.value()); > assertEquals(42, (int) consumedRecord.key()); > assertEquals("test-message", new
[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287218#comment-16287218 ] DHRUV BANSAL commented on KAFKA-6339: - Hello Narendra, Thanks for response. The problem is test keep running foreever and the same logs keeps on repeating. > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > {code:java} > String ZKHOST = "127.0.0.1"; > String BROKERHOST = "127.0.0.1"; > String BROKERPORT = "9093"; > String TOPIC = "test1"; > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer > Properties producerProps = new Properties(); > producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > producerProps.setProperty("key.serializer", > "org.apache.kafka.common.serialization.IntegerSerializer"); > producerProps.setProperty("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > KafkaProducerproducer = new > KafkaProducer (producerProps); > // setup consumer > Properties consumerProps = new Properties(); > consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > consumerProps.setProperty("group.id", "group0"); > consumerProps.setProperty("client.id", "consumer0"); > consumerProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.IntegerDeserializer"); > consumerProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > consumerProps.put("auto.offset.reset", "earliest"); // to make > sure the consumer starts from the beginning of > > // the topic > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProps); > consumer.subscribe(Arrays.asList(TOPIC)); > // send message > ProducerRecord data = new > ProducerRecord<>(TOPIC, 42, > > "test-message".getBytes(StandardCharsets.UTF_8)); > Future record = producer.send(data); > RecordMetadata metadata = record.get(); > // starting consumer > ConsumerRecords records = consumer.poll(3000); > assertEquals(1, records.count()); > Iterator > recordIterator = > records.iterator(); > ConsumerRecord consumedRecord = > recordIterator.next(); > System.out.printf("offset = %d, key = %s, value = %s", > consumedRecord.offset(), consumedRecord.key(), > consumedRecord.value()); > assertEquals(42, (int) consumedRecord.key()); > assertEquals("test-message",
[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286332#comment-16286332 ] Narendra Kumar commented on KAFKA-6339: --- Hi Dhruv, Sorry for the delayed response. This does not seem to be an error to me, log level says 'INFO'. Is your test failing ? Is the test keep running forever ? What is the exact problem you are facing , because from the logs above it doesn't seem to be an issue. > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > {code:java} > String ZKHOST = "127.0.0.1"; > String BROKERHOST = "127.0.0.1"; > String BROKERPORT = "9093"; > String TOPIC = "test1"; > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer > Properties producerProps = new Properties(); > producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > producerProps.setProperty("key.serializer", > "org.apache.kafka.common.serialization.IntegerSerializer"); > producerProps.setProperty("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > KafkaProducerproducer = new > KafkaProducer (producerProps); > // setup consumer > Properties consumerProps = new Properties(); > consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > consumerProps.setProperty("group.id", "group0"); > consumerProps.setProperty("client.id", "consumer0"); > consumerProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.IntegerDeserializer"); > consumerProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > consumerProps.put("auto.offset.reset", "earliest"); // to make > sure the consumer starts from the beginning of > > // the topic > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProps); > consumer.subscribe(Arrays.asList(TOPIC)); > // send message > ProducerRecord data = new > ProducerRecord<>(TOPIC, 42, > > "test-message".getBytes(StandardCharsets.UTF_8)); > Future record = producer.send(data); > RecordMetadata metadata = record.get(); > // starting consumer > ConsumerRecords records = consumer.poll(3000); > assertEquals(1, records.count()); > Iterator > recordIterator = > records.iterator(); > ConsumerRecord consumedRecord = > recordIterator.next(); > System.out.printf("offset = %d, key = %s, value = %s", > consumedRecord.offset(), consumedRecord.key(), >
[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284946#comment-16284946 ] DHRUV BANSAL commented on KAFKA-6339: - I am writing a different application, where I am trying to write integrated test with embedded kafka. (code provided above). Now this integration test is working perfectly fine with release 0.10.0.0, but when I upgraded the version to 0.11.0.2 I am getting following error: 00:09:06.112 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers 00:09:06.119 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0xb zxid:0x8 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config 00:09:06.126 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0x13 zxid:0xd txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin 00:09:06.183 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0x1d zxid:0x13 txntype:-1 reqpath:n/a Error Path:/cluster Error:KeeperErrorCode = NoNode for /cluster 00:09:06.539 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:setData cxid:0x27 zxid:0x17 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch 00:09:06.684 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:delete cxid:0x3c zxid:0x1a txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election 00:09:06.770 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0x46 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers 00:09:06.770 [ProcessThread(sid:0 cport:50192):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1603c92dae80001 type:create cxid:0x47 zxid:0x1c txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer >