[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case
[ https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794854#comment-17794854 ] Waleed Fateem commented on KAFKA-10457: --- [~sagarrao] we ran into this issue recently. In our scenario, the data is coming from IoT devices in JSON format without any schema. There's no control over the data. I believe the user ended up having to use the string converter instead. > JsonConverter.toConnectData trims BigInteger to Long for schema-less case > - > > Key: KAFKA-10457 > URL: https://issues.apache.org/jira/browse/KAFKA-10457 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > > > When _JsonConverter_ is configured with _schemas.enable=false_ and value, > exceeding _Double_ is passed, the result is incorrect since the converter > trims it to _Double:_ > {code:java} > Map props = Collections.singletonMap("schemas.enable", > false); > converter.configure(props, true); > BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new > BigInteger("1")); > String msg = value.toString(); > SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, > msg.getBytes()); > assertNull(schemaAndValue.schema()); > assertEquals(value, schemaAndValue.value()); > {code} > > Fails with: > > {code:java} > expected:<9223372036854775808> but was:<-9223372036854775808> > Expected :9223372036854775808 > Actual :-9223372036854775808 > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPart
[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem resolved KAFKA-14520. --- Resolution: Duplicate > TimeoutException Raised by KafkaConsumer Leads to: User provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned > -- > > Key: KAFKA-14520 > URL: https://issues.apache.org/jira/browse/KAFKA-14520 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.1 >Reporter: Waleed Fateem >Priority: Minor > > I'm on the fence on whether or not this should actually be considered a bug, > but decided to open it as such from the perspective of a sink developer. Even > though there's a sign of a potential issue on the Kafka broker's side, we're > dependent on Kafka Connect to provide a level of robustness so we don't have > to manually intervene to restart the connector. > We don't have access to the Kafka broker cluster, so we don't know what the > underlying issue might be that caused the following error during a rebalance: > {code:java} > Nov 21, 2022 @ > 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of > 6ms expired before the position for partition topic-partition-2 could be > determined {code} > That leads to the following problem: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer > clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User > provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned for partitions [] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [task-thread-the-sink-1] > {code} > The KafkaConsumer's position() method invoked in the WorkerSinkTask's > HandleRebalance > [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] > causing that TimeoutException is: > {code:java} > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection > partitions){ > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > for (TopicPartition tp : partitions) {long pos = > consumer.position(tp);lastCommittedOffsets.put(tp, new > OffsetAndMetadata(pos));currentOffsets.put(tp, new > OffsetAndMetadata(pos));log.debug("{} Assigned topic > partition {} with offset {}", WorkerSinkTask.this, tp, pos); > }{code} > Which is then considered an unrecoverable error > [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR > WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except > ion. Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} > Do we expect that TimeoutException to cause the task to be killed, or should > have this been handled ideally somehow in the WorkerSinkTask's > HandleRebalance code? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPar
[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702185#comment-17702185 ] Waleed Fateem commented on KAFKA-14520: --- Yes, it sounds like it's pretty much the same issue. The user that originally complained about this problem highlighted that it randomly occurred after a topic from the list of topics that the connector was subscribed to (also using regex) was deleted. I initially thought it's unrelated because when they reproduced the problem, the problematic partition belonged to another topic and not the one that was deleted. In fact, I saw that the consumer didn't have any issues with all the other partitions for that same topic. I don't think it's a coincidence and it's probably reasonable to say that this is a duplicate of [KAFKA-14750|https://issues.apache.org/jira/browse/KAFKA-14750]. I'm not honestly sure what would be the best way to handle this. For now, I'll go ahead and close this Jira and mark it as a duplicate. If I'm able to reproduce the problem or have any ideas worth sharing then I'll comment on the other Jira. Thanks again for taking the time to reply! > TimeoutException Raised by KafkaConsumer Leads to: User provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned > -- > > Key: KAFKA-14520 > URL: https://issues.apache.org/jira/browse/KAFKA-14520 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.1 >Reporter: Waleed Fateem >Priority: Minor > > I'm on the fence on whether or not this should actually be considered a bug, > but decided to open it as such from the perspective of a sink developer. Even > though there's a sign of a potential issue on the Kafka broker's side, we're > dependent on Kafka Connect to provide a level of robustness so we don't have > to manually intervene to restart the connector. > We don't have access to the Kafka broker cluster, so we don't know what the > underlying issue might be that caused the following error during a rebalance: > {code:java} > Nov 21, 2022 @ > 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of > 6ms expired before the position for partition topic-partition-2 could be > determined {code} > That leads to the following problem: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer > clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User > provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned for partitions [] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [task-thread-the-sink-1] > {code} > The KafkaConsumer's position() method invoked in the WorkerSinkTask's > HandleRebalance > [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] > causing that TimeoutException is: > {code:java} > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection > partitions){ > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > for (TopicPartition tp : partitions) {long pos = > consumer.position(tp);lastCommittedOffsets.put(tp, new > OffsetAndMetadata(pos));currentOffsets.put(tp, new > OffsetAndMetadata(pos));log.debug("{} Assigned topic > partition {} with offset {}", WorkerSinkTask.this, tp, pos); > }{code} > Which is then considered an unrecoverable error > [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR > WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except > ion. Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} > Do we expect that TimeoutException to cause the task to be killed, or should > have this been handled ideally somehow in the WorkerSinkTask's > HandleRebalance code? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPar
[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702118#comment-17702118 ] Waleed Fateem commented on KAFKA-14520: --- Hi [~sagarrao], Thank you for the response. I'm not honestly sure about the console consumer and if there were other consumers reading from that topic at the time that experienced the same issue. However, that wasn't my main concern. The focus was on the Kafka sink connector and trying to understand why a task was failing. The issue was that in this particular environment, there wasn't really a good way of monitoring the different connectors and their tasks, so it only becomes clear that there was a problem when other users noticed that some data wasn't making it to the destination. So it sounds like this is expected then, and it's not an issue with the Kafka Connect runtime nor there's a reason to believe that an enhancement of some sort would make sense. Is that a fair statement? Is there anything you think a sink connector can or should do in a situation like this? My understanding is that there's not much we can do here, right? > TimeoutException Raised by KafkaConsumer Leads to: User provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned > -- > > Key: KAFKA-14520 > URL: https://issues.apache.org/jira/browse/KAFKA-14520 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.1 >Reporter: Waleed Fateem >Priority: Minor > > I'm on the fence on whether or not this should actually be considered a bug, > but decided to open it as such from the perspective of a sink developer. Even > though there's a sign of a potential issue on the Kafka broker's side, we're > dependent on Kafka Connect to provide a level of robustness so we don't have > to manually intervene to restart the connector. > We don't have access to the Kafka broker cluster, so we don't know what the > underlying issue might be that caused the following error during a rebalance: > {code:java} > Nov 21, 2022 @ > 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of > 6ms expired before the position for partition topic-partition-2 could be > determined {code} > That leads to the following problem: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer > clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User > provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned for partitions [] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [task-thread-the-sink-1] > {code} > The KafkaConsumer's position() method invoked in the WorkerSinkTask's > HandleRebalance > [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] > causing that TimeoutException is: > {code:java} > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection > partitions){ > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > for (TopicPartition tp : partitions) {long pos = > consumer.position(tp);lastCommittedOffsets.put(tp, new > OffsetAndMetadata(pos));currentOffsets.put(tp, new > OffsetAndMetadata(pos));log.debug("{} Assigned topic > partition {} with offset {}", WorkerSinkTask.this, tp, pos); > }{code} > Which is then considered an unrecoverable error > [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR > WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except > ion. Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} > Do we expect that TimeoutException to cause the task to be killed, or should > have this been handled ideally somehow in the WorkerSinkTask's > HandleRebalance code? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics
[ https://issues.apache.org/jira/browse/KAFKA-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem resolved KAFKA-6301. -- Resolution: Won't Fix Really old and irrelevant at this point. > Incorrect Java Regex example '*' for mirroring all topics > - > > Key: KAFKA-6301 > URL: https://issues.apache.org/jira/browse/KAFKA-6301 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.10.2.0, 0.11.0.0, 1.0.0 >Reporter: Waleed Fateem >Assignee: Waleed Fateem >Priority: Minor > Labels: documentation, mirror-maker > > The documentation for section "Mirroring data between clusters" states the > following: > Or you could mirror all topics using --whitelist '*' > The regular expression should be '.*' instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onParti
[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem updated KAFKA-14520: -- Description: I'm on the fence on whether or not this should actually be considered a bug, but decided to open it as such from the perspective of a sink developer. Even though there's a sign of a potential issue on the Kafka broker's side, we're dependent on Kafka Connect to provide a level of robustness so we don't have to manually intervene to restart the connector. We don't have access to the Kafka broker cluster, so we don't know what the underlying issue might be that caused the following error during a rebalance: {code:java} Nov 21, 2022 @ 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition topic-partition-2 could be determined {code} That leads to the following problem: {code:java} Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned for partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-the-sink-1] {code} The KafkaConsumer's position() method invoked in the WorkerSinkTask's HandleRebalance [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] causing that TimeoutException is: {code:java} private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions){ log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); for (TopicPartition tp : partitions) {long pos = consumer.position(tp);lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));currentOffsets.put(tp, new OffsetAndMetadata(pos));log.debug("{} Assigned topic partition {} with offset {}", WorkerSinkTask.this, tp, pos);}{code} Which is then considered an unrecoverable error [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: {code:java} Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except ion. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} Do we expect that TimeoutException to cause the task to be killed, or should have this been handled ideally somehow in the WorkerSinkTask's HandleRebalance code? was: I'm on the fence on whether or not this should actually be considered a bug, but decided to open it as such from the perspective of a sink developer. Even though there's a sign of a potential issue on the Kafka broker's side, we're dependent on Kafka Connect to provide a level of robustness so we don't have to manually intervene to restart the connector. We don't have access to the Kafka broker cluster, so we don't know what the underlying issue might be that caused the following error during a rebalance: {code:java} Nov 21, 2022 @ 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition topic-partition-2 could be determined {code} That leads to the following problem: {code:java} Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned for partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-the-sink-1] {code} The KafkaConsumer's position() method invoked in the WorkerSinkTask's HandleRebalance code causing that TimeoutException is [here|[https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]:]]: {code:java} private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions){ log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); for (TopicPartition tp : partitions) {long pos = consumer.position(tp);lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));currentOffsets.put(tp, new OffsetAndMetadata(pos));log.de
[jira] [Created] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onParti
Waleed Fateem created KAFKA-14520: - Summary: TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned Key: KAFKA-14520 URL: https://issues.apache.org/jira/browse/KAFKA-14520 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.2.1 Reporter: Waleed Fateem I'm on the fence on whether or not this should actually be considered a bug, but decided to open it as such from the perspective of a sink developer. Even though there's a sign of a potential issue on the Kafka broker's side, we're dependent on Kafka Connect to provide a level of robustness so we don't have to manually intervene to restart the connector. We don't have access to the Kafka broker cluster, so we don't know what the underlying issue might be that caused the following error during a rebalance: {code:java} Nov 21, 2022 @ 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition topic-partition-2 could be determined {code} That leads to the following problem: {code:java} Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned for partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-the-sink-1] {code} The KafkaConsumer's position() method invoked in the WorkerSinkTask's HandleRebalance code causing that TimeoutException is [here|[https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]:]]: {code:java} private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions){ log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); for (TopicPartition tp : partitions) {long pos = consumer.position(tp);lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));currentOffsets.put(tp, new OffsetAndMetadata(pos));log.debug("{} Assigned topic partition {} with offset {}", WorkerSinkTask.this, tp, pos);}{code} Which is then considered an unrecoverable error [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: {code:java} Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except ion. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} Do we expect that TimeoutException to cause the task to be killed, or should have this been handled ideally somehow in the WorkerSinkTask's HandleRebalance code? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Issue Comment Deleted] (KAFKA-6302) Topic can not be recreated after it is deleted
[ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem updated KAFKA-6302: - Comment: was deleted (was: Hi kic, I think you should be able to recreate a topic with the same name without any issues, but this is with the assumption that the topic doesn't already exist. Did you make sure that the topic was deleted properly? Did you run the kafkak-topics --zookeeper ZHOST:2181 --describe command after deletion to confirm that your topic was in fact deleted and not in the "marked for deletion" state? What error, if any, did you see when you attempted to create the topic again? I don't believe the topic will be deleted so long as you have clients connected to the topic. I think at that point Kafka is just going to mark the topic for deletion. I would need to run a test to confirm. ) > Topic can not be recreated after it is deleted > -- > > Key: KAFKA-6302 > URL: https://issues.apache.org/jira/browse/KAFKA-6302 > Project: Kafka > Issue Type: Bug > Components: admin, clients >Affects Versions: 1.0.0 >Reporter: kic >Assignee: Matthias J. Sax >Priority: Major > Fix For: 1.0.1, 1.1.0 > > > I use an embedded kafka for unit test. My application relies on the ability > to recreate topics programmatically. Currently it is not possible to > re-create a topic after it has been deleted. > {code} > // needs compile time depedency > 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and > 'org.apache.kafka:kafka-clients:1.0.0' > package kic.kafka.embedded > import java.util.Properties > import org.apache.kafka.clients.admin.{AdminClient, NewTopic} > import org.scalatest._ > import scala.collection.JavaConverters._ > class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers { > val props = new Properties() > val testTopic = "test-topic" > "The admin client" should "be able to create, delete and re-create topics" > in { > props.setProperty("bootstrap.servers", "localhost:10001") > props.setProperty("delete.enable.topic", "true") > props.setProperty("group.id", "test-client") > props.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.LongDeserializer") > props.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer") > props.setProperty("clinet.id", "test-client") > props.setProperty("key.serializer", > "org.apache.kafka.common.serialization.LongSerializer") > props.setProperty("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer") > EmbeddedKafaJavaWrapper.start(10001, 10002, props) > try { > implicit val admin = AdminClient.create(props) > // create topic and confirm it exists > createTopic(testTopic) > val topics = listTopics() > info(s"topics: $topics") > topics should contain(testTopic) > // now we should be able to send something to this topic > // TODO create producer and send something > // delete topic > deleteTopic(testTopic) > listTopics() shouldNot contain(testTopic) > // recreate topic > createTopic(testTopic) > // listTopics() should contain(testTopic) > // and finally consume from the topic and expect to get 0 entries > // TODO create consumer and poll once > } finally { > EmbeddedKafaJavaWrapper.stop() > } > } > def listTopics()(implicit admin: AdminClient) = > admin.listTopics().names().get() > def createTopic(topic: String)(implicit admin: AdminClient) = > admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava) > def deleteTopic(topic: String)(implicit admin: AdminClient) = > admin.deleteTopics(Seq("test-topic").asJava).all().get() > } > {code} > Btw, what happens to connected producers/consumers when I delete a topic? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-6408) Kafka MirrorMaker doesn't replicate messages when .* regex is used
[ https://issues.apache.org/jira/browse/KAFKA-6408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem resolved KAFKA-6408. -- Resolution: Not A Problem > Kafka MirrorMaker doesn't replicate messages when .* regex is used > -- > > Key: KAFKA-6408 > URL: https://issues.apache.org/jira/browse/KAFKA-6408 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.11.0.0 >Reporter: Waleed Fateem >Priority: Minor > > When using the regular expression .* for the whitelist parameter in Kafka > MirrorMaker in order to mirror all topics, the MirrorMaker doesn't replicate > any messages. I was then able to see messages flowing again and being > replicated between the two Kafka clusters once I changed the whitelist > configuration to use another regular expression, such as 'topic1 | topic2 | > topic3' -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6408) Kafka MirrorMaker doesn't replicate messages when .* regex is used
[ https://issues.apache.org/jira/browse/KAFKA-6408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869618#comment-16869618 ] Waleed Fateem commented on KAFKA-6408: -- Just closing the loop here. This is only an issue when you configure Kafka using the Cloudera Manager UI. The problem is with Cloudera Manager and not Kafka, where it's not parsing the regular expression correctly. Marking as resolved. > Kafka MirrorMaker doesn't replicate messages when .* regex is used > -- > > Key: KAFKA-6408 > URL: https://issues.apache.org/jira/browse/KAFKA-6408 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.11.0.0 >Reporter: Waleed Fateem >Priority: Minor > > When using the regular expression .* for the whitelist parameter in Kafka > MirrorMaker in order to mirror all topics, the MirrorMaker doesn't replicate > any messages. I was then able to see messages flowing again and being > replicated between the two Kafka clusters once I changed the whitelist > configuration to use another regular expression, such as 'topic1 | topic2 | > topic3' -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6408) Kafka MirrorMaker doesn't replicate messages when .* regex is used
[ https://issues.apache.org/jira/browse/KAFKA-6408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307393#comment-16307393 ] Waleed Fateem edited comment on KAFKA-6408 at 1/1/18 10:06 AM: --- The documentation is actually wrong and I created another JIRA to get that corrected: https://issues.apache.org/jira/browse/KAFKA-6301 If you use * you'll see in the MirrorMaker's log that it starts up with a blank whitelist parameter as opposed to using .* which shows up correctly. was (Author: waleedfateem): The documentation is actually wrong and I created another JIRA to get that corrected: https://issues.apache.org/jira/browse/KAFKA-6301 If you use '*' you'll see in the MirrorMaker's log that it starts up with a blank whitelist parameter as opposed to using '.*' which shows up correctly. > Kafka MirrorMaker doesn't replicate messages when .* regex is used > -- > > Key: KAFKA-6408 > URL: https://issues.apache.org/jira/browse/KAFKA-6408 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.11.0.0 >Reporter: Waleed Fateem >Priority: Minor > > When using the regular expression .* for the whitelist parameter in Kafka > MirrorMaker in order to mirror all topics, the MirrorMaker doesn't replicate > any messages. I was then able to see messages flowing again and being > replicated between the two Kafka clusters once I changed the whitelist > configuration to use another regular expression, such as 'topic1 | topic2 | > topic3' -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6408) Kafka MirrorMaker doesn't replicate messages when .* regex is used
[ https://issues.apache.org/jira/browse/KAFKA-6408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307393#comment-16307393 ] Waleed Fateem commented on KAFKA-6408: -- The documentation is actually wrong and I created another JIRA to get that corrected: https://issues.apache.org/jira/browse/KAFKA-6301 If you use '*' you'll see in the MirrorMaker's log that it starts up with a blank whitelist parameter as opposed to using '.*' which shows up correctly. > Kafka MirrorMaker doesn't replicate messages when .* regex is used > -- > > Key: KAFKA-6408 > URL: https://issues.apache.org/jira/browse/KAFKA-6408 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.11.0.0 >Reporter: Waleed Fateem >Priority: Minor > > When using the regular expression .* for the whitelist parameter in Kafka > MirrorMaker in order to mirror all topics, the MirrorMaker doesn't replicate > any messages. I was then able to see messages flowing again and being > replicated between the two Kafka clusters once I changed the whitelist > configuration to use another regular expression, such as 'topic1 | topic2 | > topic3' -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6408) Kafka MirrorMaker doesn't replicate messages when .* regex is used
Waleed Fateem created KAFKA-6408: Summary: Kafka MirrorMaker doesn't replicate messages when .* regex is used Key: KAFKA-6408 URL: https://issues.apache.org/jira/browse/KAFKA-6408 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.11.0.0 Reporter: Waleed Fateem Priority: Minor When using the regular expression .* for the whitelist parameter in Kafka MirrorMaker in order to mirror all topics, the MirrorMaker doesn't replicate any messages. I was then able to see messages flowing again and being replicated between the two Kafka clusters once I changed the whitelist configuration to use another regular expression, such as 'topic1 | topic2 | topic3' -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6302) Topic can not be recreated after it is deleted
[ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276063#comment-16276063 ] Waleed Fateem commented on KAFKA-6302: -- Hi kic, I think you should be able to recreate a topic with the same name without any issues, but this is with the assumption that the topic doesn't already exist. Did you make sure that the topic was deleted properly? Did you run the kafkak-topics --zookeeper ZHOST:2181 --describe command after deletion to confirm that your topic was in fact deleted and not in the "marked for deletion" state? What error, if any, did you see when you attempted to create the topic again? I don't believe the topic will be deleted so long as you have clients connected to the topic. I think at that point Kafka is just going to mark the topic for deletion. I would need to run a test to confirm. > Topic can not be recreated after it is deleted > -- > > Key: KAFKA-6302 > URL: https://issues.apache.org/jira/browse/KAFKA-6302 > Project: Kafka > Issue Type: Bug > Components: admin, clients >Affects Versions: 1.0.0 >Reporter: kic > > I use an embedded kafka for unit test. My application relies on the ability > to recreate topics programmatically. Currently it is not possible to > re-create a topic after it has been deleted. > {code} > // needs compile time depedency > 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and > 'org.apache.kafka:kafka-clients:1.0.0' > package kic.kafka.embedded > import java.util.Properties > import org.apache.kafka.clients.admin.{AdminClient, NewTopic} > import org.scalatest._ > import scala.collection.JavaConverters._ > class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers { > val props = new Properties() > val testTopic = "test-topic" > "The admin client" should "be able to create, delete and re-create topics" > in { > props.setProperty("bootstrap.servers", "localhost:10001") > props.setProperty("delete.enable.topic", "true") > props.setProperty("group.id", "test-client") > props.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.LongDeserializer") > props.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer") > props.setProperty("clinet.id", "test-client") > props.setProperty("key.serializer", > "org.apache.kafka.common.serialization.LongSerializer") > props.setProperty("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer") > EmbeddedKafaJavaWrapper.start(10001, 10002, props) > try { > implicit val admin = AdminClient.create(props) > // create topic and confirm it exists > createTopic(testTopic) > val topics = listTopics() > info(s"topics: $topics") > topics should contain(testTopic) > // now we should be able to send something to this topic > // TODO create producer and send something > // delete topic > deleteTopic(testTopic) > listTopics() shouldNot contain(testTopic) > // recreate topic > createTopic(testTopic) > // listTopics() should contain(testTopic) > // and finally consume from the topic and expect to get 0 entries > // TODO create consumer and poll once > } finally { > EmbeddedKafaJavaWrapper.stop() > } > } > def listTopics()(implicit admin: AdminClient) = > admin.listTopics().names().get() > def createTopic(topic: String)(implicit admin: AdminClient) = > admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava) > def deleteTopic(topic: String)(implicit admin: AdminClient) = > admin.deleteTopics(Seq("test-topic").asJava).all().get() > } > {code} > Btw, what happens to connected producers/consumers when I delete a topic? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics
[ https://issues.apache.org/jira/browse/KAFKA-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Waleed Fateem reassigned KAFKA-6301: Assignee: Waleed Fateem > Incorrect Java Regex example '*' for mirroring all topics > - > > Key: KAFKA-6301 > URL: https://issues.apache.org/jira/browse/KAFKA-6301 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.10.2.0, 0.11.0.0, 1.0.0 >Reporter: Waleed Fateem >Assignee: Waleed Fateem >Priority: Minor > Labels: documentation, mirror-maker > > The documentation for section "Mirroring data between clusters" states the > following: > Or you could mirror all topics using --whitelist '*' > The regular expression should be '.*' instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics
Waleed Fateem created KAFKA-6301: Summary: Incorrect Java Regex example '*' for mirroring all topics Key: KAFKA-6301 URL: https://issues.apache.org/jira/browse/KAFKA-6301 Project: Kafka Issue Type: Bug Components: documentation Affects Versions: 1.0.0, 0.11.0.0, 0.10.2.0 Reporter: Waleed Fateem Priority: Minor The documentation for section "Mirroring data between clusters" states the following: Or you could mirror all topics using --whitelist '*' The regular expression should be '.*' instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5705) Kafka Server start failed and reports "unsafe memory access operation"
[ https://issues.apache.org/jira/browse/KAFKA-5705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117305#comment-16117305 ] Waleed Fateem commented on KAFKA-5705: -- [~hachikuji] Just wondering if this is actually a different scenario that leads up to the same memory access issue. The error stack reported here looks a bit different than what was reported in KAFKA-5628. > Kafka Server start failed and reports "unsafe memory access operation" > -- > > Key: KAFKA-5705 > URL: https://issues.apache.org/jira/browse/KAFKA-5705 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Chen He > > [2017-08-02 15:50:23,361] FATAL Fatal error during KafkaServerStartable > startup. Prepare to shutdown (kafka.server.KafkaServerStartable) > java.lang.InternalError: a fault occurred in a recent unsafe memory access > operation in compiled Java code > at > kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply$mcV$sp(TimeIndex.scala:128) > at > kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107) > at > kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:107) > at kafka.log.LogSegment.recover(LogSegment.scala:252) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:231) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.Log.loadSegments(Log.scala:188) > at kafka.log.Log.(Log.scala:116) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:157) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.4.14#64029)