Review Request 36474: Patch for KAFKA-2188
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36474/ --- Review request for kafka. Bugs: KAFKA-2188 https://issues.apache.org/jira/browse/KAFKA-2188 Repository: kafka Description --- KAFKA-2188 - JBOD Support Diffs - core/src/main/scala/kafka/cluster/Partition.scala 2649090b6cbf8d442649f19fd7113a30d62bca91 core/src/main/scala/kafka/common/GenericKafkaStorageException.scala PRE-CREATION core/src/main/scala/kafka/controller/KafkaController.scala b4fc755641b9bbe8a6bf9c221a9ffaec0b94d6e8 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala bb6b5c8764522e7947bb08998256ce1deb717c84 core/src/main/scala/kafka/log/Log.scala e5e80079645ce6e6fe7bb1c2696d3dd21a07761b core/src/main/scala/kafka/log/LogManager.scala 69386c17153e5ef08a24d4f14b915e4316b121d8 core/src/main/scala/kafka/log/LogSegment.scala 1377e8f322a3fedc683d93feaf27c955de528a4b core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/OffsetCheckpoint.scala 8c5b0546908d3b3affb9f48e2ece9ed252518783 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala c89d00b5976ffa34cafdae261239934b1b917bfe core/src/main/scala/kafka/server/ReplicaManager.scala 795220e7f63d163be90738b4c1a39687b44c1395 core/src/main/scala/kafka/utils/ZkUtils.scala 166814c2959a429e20f400d1c0e523090ce37d91 core/src/test/scala/unit/kafka/log/LogManagerTest.scala a13f2bef8ee8c3d42192c9a60df092023e4d2ff9 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 60cd8249e6ec03349e20bb0a7226ea9cd66e6b17 core/src/test/scala/unit/kafka/utils/TestUtils.scala 17e9fe4c159a29033fe9a287db6ced2fdc3fa9d1 Diff: https://reviews.apache.org/r/36474/diff/ Testing --- Thanks, Timothy Chen
Re: Build failed in Jenkins: KafkaPreCommit #147
Looks like ConsumerBounceTest is flaky. Anyone knows if there's a JIRA for it? It seemed to be failing more-or-less at random in the last few builds. On Mon, Jul 13, 2015 at 10:48 PM, Apache Jenkins Server jenk...@builds.apache.org wrote: See https://builds.apache.org/job/KafkaPreCommit/147/changes Changes: [cshapi] KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by Manikumar Reddy reviewed by Gwen Shapira -- [...truncated 3035 lines...] kafka.consumer.MetricsTest testMetricsReporterAfterDeletingTopic PASSED kafka.consumer.ZookeeperConsumerConnectorTest testCompressionSetConsumption PASSED kafka.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.consumer.ZookeeperConsumerConnectorTest testLeaderSelectionForPartition PASSED kafka.consumer.ZookeeperConsumerConnectorTest testConsumerDecoder PASSED kafka.consumer.ZookeeperConsumerConnectorTest testConsumerRebalanceListener PASSED kafka.consumer.ZookeeperConsumerConnectorTest testCompression PASSED kafka.consumer.ConsumerIteratorTest testConsumerIteratorDecodingFailure PASSED kafka.consumer.ConsumerIteratorTest testConsumerIteratorDeduplicationDeepIterator PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeNow PASSED kafka.server.LogOffsetTest testGetOffsetsForUnknownTopic PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeEarliestTime PASSED kafka.server.LogOffsetTest testEmptyLogsGetOffsets PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeLatestTime PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceSinglePartition PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceMultiplePartitions PASSED kafka.server.KafkaConfigTest testInvalidCompressionType PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMinutesProvided PASSED kafka.server.KafkaConfigTest testAdvertisePortDefault PASSED kafka.server.KafkaConfigTest testAdvertiseHostNameDefault PASSED kafka.server.KafkaConfigTest testDuplicateListeners PASSED kafka.server.KafkaConfigTest testBadListenerProtocol PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndMsProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeMsProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeBothMsAndHoursProvided PASSED kafka.server.KafkaConfigTest testListenerDefaults PASSED kafka.server.KafkaConfigTest testUncleanElectionDisabled PASSED kafka.server.KafkaConfigTest testUncleanElectionEnabled PASSED kafka.server.KafkaConfigTest testLogRetentionValid PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMsProvided PASSED kafka.server.KafkaConfigTest testAdvertiseDefaults PASSED kafka.server.KafkaConfigTest testAdvertiseConfigured PASSED kafka.server.KafkaConfigTest testDefaultCompressionType PASSED kafka.server.KafkaConfigTest testValidCompressionType PASSED kafka.server.KafkaConfigTest testLogRetentionTimeNoConfigProvided PASSED kafka.server.KafkaConfigTest testUncleanElectionInvalid PASSED kafka.server.KafkaConfigTest testVersionConfiguration PASSED kafka.server.KafkaConfigTest testUncleanLeaderElectionDefault PASSED kafka.server.KafkaConfigTest testLogRetentionTimeHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionUnlimited PASSED kafka.server.KafkaConfigTest testLogRollTimeNoConfigProvided PASSED kafka.server.LogRecoveryTest testHWCheckpointWithFailuresSingleLogSegment PASSED kafka.server.LogRecoveryTest testHWCheckpointNoFailuresSingleLogSegment PASSED kafka.server.LogRecoveryTest testHWCheckpointNoFailuresMultipleLogSegments PASSED kafka.server.LogRecoveryTest testHWCheckpointWithFailuresMultipleLogSegments PASSED kafka.server.DelayedOperationTest testRequestExpiry PASSED kafka.server.DelayedOperationTest testRequestSatisfaction PASSED kafka.server.DelayedOperationTest testRequestPurge PASSED kafka.server.AdvertiseBrokerTest testBrokerAdvertiseToZK PASSED kafka.server.ReplicaManagerTest testHighWaterMarkDirectoryMapping PASSED kafka.server.ReplicaManagerTest testIllegalRequiredAcks PASSED kafka.server.ReplicaManagerTest
Build failed in Jenkins: Kafka-trunk #541
See https://builds.apache.org/job/Kafka-trunk/541/changes Changes: [cshapi] KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by Manikumar Reddy reviewed by Gwen Shapira -- [...truncated 1443 lines...] kafka.log.LogTest testIndexResizingAtTruncation PASSED kafka.log.LogSegmentTest testCreateWithInitFileSizeAppendMessage PASSED kafka.log.LogSegmentTest testCreateWithInitFileSizeClearShutdown PASSED kafka.log.LogSegmentTest testTruncateFull PASSED kafka.log.LogSegmentTest testNextOffsetCalculation PASSED kafka.log.LogSegmentTest testChangeFileSuffixes PASSED kafka.log.LogSegmentTest testRecoveryFixesCorruptIndex PASSED kafka.log.LogSegmentTest testRecoveryWithCorruptMessage PASSED kafka.log.LogSegmentTest testReadOnEmptySegment PASSED kafka.log.LogSegmentTest testReadBeforeFirstOffset PASSED kafka.log.LogSegmentTest testMaxOffset PASSED kafka.log.LogSegmentTest testReadAfterLast PASSED kafka.log.LogSegmentTest testReadFromGap PASSED kafka.log.LogSegmentTest testTruncate PASSED kafka.log.LogConfigTest testFromPropsToProps PASSED kafka.log.LogConfigTest testFromPropsEmpty PASSED kafka.log.LogConfigTest testFromPropsInvalid PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.CleanerTest testRecoveryAfterCrash PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testSegmentGroupingWithSparseOffsets PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleaningWithUnkeyedMessages PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.FileMessageSetTest testPreallocateTrue PASSED kafka.log.FileMessageSetTest testPreallocateFalse PASSED kafka.log.FileMessageSetTest testPreallocateClearShutdown PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[14] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[15] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[16] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[17] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[18] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[19] PASSED kafka.log.LogManagerTest testGetNonExistentLog PASSED kafka.log.LogManagerTest testCleanupExpiredSegments PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithTrailingSlash PASSED kafka.log.LogManagerTest testRecoveryDirectoryMappingWithRelativeDirectory PASSED kafka.log.LogManagerTest testCreateLog PASSED kafka.log.LogManagerTest testCleanupSegmentsToMaintainSize PASSED kafka.log.LogManagerTest testTimeBasedFlush PASSED kafka.log.LogManagerTest testLeastLoadedAssignment PASSED kafka.log.LogManagerTest testTwoLogManagersUsingSameDirFails PASSED kafka.log.LogManagerTest testCheckpointRecoveryPoints PASSED kafka.log.LogCleanerIntegrationTest
[jira] [Updated] (KAFKA-2198) kafka-topics.sh exits with 0 status on failures
[ https://issues.apache.org/jira/browse/KAFKA-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2198: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) kafka-topics.sh exits with 0 status on failures --- Key: KAFKA-2198 URL: https://issues.apache.org/jira/browse/KAFKA-2198 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.1 Reporter: Bob Halley Assignee: Manikumar Reddy Fix For: 0.8.3 Attachments: KAFKA-2198.patch, KAFKA-2198_2015-05-19_18:27:01.patch, KAFKA-2198_2015-05-19_18:41:25.patch, KAFKA-2198_2015-07-10_22:02:02.patch, KAFKA-2198_2015-07-10_23:11:23.patch, KAFKA-2198_2015-07-13_19:24:46.patch In the two failure cases below, kafka-topics.sh exits with status 0. You shouldn't need to parse output from the command to know if it failed or not. Case 1: Forgetting to add Kafka zookeeper chroot path to zookeeper spec $ kafka-topics.sh --alter --topic foo --config min.insync.replicas=2 --zookeeper 10.0.0.1 echo succeeded succeeded Case 2: Bad config option. (Also, do we really need the java backtrace? It's a lot of noise most of the time.) $ kafka-topics.sh --alter --topic foo --config min.insync.replicasTYPO=2 --zookeeper 10.0.0.1/kafka echo succeeded Error while executing topic command requirement failed: Unknown configuration min.insync.replicasTYPO. java.lang.IllegalArgumentException: requirement failed: Unknown configuration min.insync.replicasTYPO. at scala.Predef$.require(Predef.scala:233) at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:183) at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:182) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at kafka.log.LogConfig$.validateNames(LogConfig.scala:182) at kafka.log.LogConfig$.validate(LogConfig.scala:190) at kafka.admin.TopicCommand$.parseTopicConfigsToBeAdded(TopicCommand.scala:205) at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:103) at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:100) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:100) at kafka.admin.TopicCommand$.main(TopicCommand.scala:57) at kafka.admin.TopicCommand.main(TopicCommand.scala) succeeded -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2188) JBOD Support
[ https://issues.apache.org/jira/browse/KAFKA-2188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625910#comment-14625910 ] Timothy Chen commented on KAFKA-2188: - Created reviewboard https://reviews.apache.org/r/36474/diff/ against branch origin/trunk JBOD Support Key: KAFKA-2188 URL: https://issues.apache.org/jira/browse/KAFKA-2188 Project: Kafka Issue Type: Bug Reporter: Andrii Biletskyi Assignee: Andrii Biletskyi Attachments: KAFKA-2188.patch, KAFKA-2188.patch https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2188) JBOD Support
[ https://issues.apache.org/jira/browse/KAFKA-2188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timothy Chen updated KAFKA-2188: Attachment: KAFKA-2188.patch JBOD Support Key: KAFKA-2188 URL: https://issues.apache.org/jira/browse/KAFKA-2188 Project: Kafka Issue Type: Bug Reporter: Andrii Biletskyi Assignee: Andrii Biletskyi Attachments: KAFKA-2188.patch, KAFKA-2188.patch https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34641: Patch for KAFKA-2214
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated July 14, 2015, 10:03 a.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- Addressing Gwen's comments Diffs (updated) - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ea345895a52977c25bff57e95e12b8662331d7fe Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
Re: Review Request 34641: Patch for KAFKA-2214
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated July 14, 2015, 10:13 a.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- Addressing Gwen's comments Diffs (updated) - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ea345895a52977c25bff57e95e12b8662331d7fe Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Updated] (KAFKA-2320) Configure GitHub pull request build in Jenkins
[ https://issues.apache.org/jira/browse/KAFKA-2320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2320: --- Description: The details are available in the following Apache Infra post: https://blogs.apache.org/infra/entry/github_pull_request_builds_now I paste the instructions here as well for convenience: {quote} Here’s what you need to do to set it up: * Create a new job, probably copied from an existing job. * Make sure you’re not doing any “mvn deploy” or equivalent in the new job - this job shouldn’t be deploying any artifacts to Nexus, etc. * Check the Enable Git validated merge support” box - you can leave the first few fields set to their default, since we’re not actually pushing anything. This is just required to get the pull request builder to register correctly. * Set the “GitHub project” field to the HTTP URL for your repository - i.e.,http://github.com/apache/kafka/- make sure it ends with that trailing slash and doesn’t include .git, etc. * In the Git SCM section of the job configuration, set the repository URL to point to the GitHub git:// URL for your repository - i.e., git://github.com/apache/kafka.git. * You should be able to leave the “Branches to build” field as is - this won’t be relevant anyway. * Click the “Add” button in “Additional Behaviors” and choose Strategy for choosing what to build”. Make sure the choosing strategy is set to “Build commits submitted for validated merge”. * Uncheck any existing build triggers - this shouldn’t be running on a schedule, polling, running when SNAPSHOT dependencies are built, etc. * Check the “Build pull requests to the repository” option in the build triggers. * Optionally change anything else in the job that you’d like to be different for a pull request build than for a normal build - i.e., any downstream build triggers should probably be removed, you may want to change email recipients, etc. * Save, and you’re done! {quote} was: The details are available in the following Apache Infra post: https://blogs.apache.org/infra/entry/github_pull_request_builds_now I paste the instructions here as well for convenience: {quote} Here’s what you need to do to set it up: * Create a new job, probably copied from an existing job. * Make sure you’re not doing any “mvn deploy” or equivalent in the new job - this job shouldn’t be deploying any artifacts to Nexus, etc. * Check the Enable Git validated merge support” box - you can leave the first few fields set to their default, since we’re not actually pushing anything. This is just required to get the pull request builder to register correctly. * Set the “GitHub project” field to the HTTP URL for your repository - i.e.,http://github.com/apache/incubator-brooklyn/- make sure it ends with that trailing slash and doesn’t include .git, etc. * In the Git SCM section of the job configuration, set the repository URL to point to the GitHub git:// URL for your repository - i.e., git://github.com/apache/kafka.git. * You should be able to leave the “Branches to build” field as is - this won’t be relevant anyway. * Click the “Add” button in “Additional Behaviors” and choose Strategy for choosing what to build”. Make sure the choosing strategy is set to “Build commits submitted for validated merge”. * Uncheck any existing build triggers - this shouldn’t be running on a schedule, polling, running when SNAPSHOT dependencies are built, etc. * Check the “Build pull requests to the repository” option in the build triggers. * Optionally change anything else in the job that you’d like to be different for a pull request build than for a normal build - i.e., any downstream build triggers should probably be removed, you may want to change email recipients, etc. * Save, and you’re done! {quote} Configure GitHub pull request build in Jenkins -- Key: KAFKA-2320 URL: https://issues.apache.org/jira/browse/KAFKA-2320 Project: Kafka Issue Type: Task Reporter: Ismael Juma The details are available in the following Apache Infra post: https://blogs.apache.org/infra/entry/github_pull_request_builds_now I paste the instructions here as well for convenience: {quote} Here’s what you need to do to set it up: * Create a new job, probably copied from an existing job. * Make sure you’re not doing any “mvn deploy” or equivalent in the new job - this job shouldn’t be deploying any artifacts to Nexus, etc. * Check the Enable Git validated merge support” box - you can leave the first few fields set to their default, since we’re not actually pushing anything. This is just required to get the pull request builder to register correctly. * Set the “GitHub project” field to the HTTP URL for your repository - i.e.,http://github.com/apache/kafka/- make sure it ends with that trailing slash and
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
Hi Jun, On Tue, Jul 14, 2015 at 2:09 AM, Jun Rao j...@confluent.io wrote: Ismael, I followed the instructions in KAFKA-2320 and created a new Jenkins job ( https://builds.apache.org/job/kafka-trunk-git-pr/). Could you check if it works? Thanks! It seems to be building when trunk changes as opposed to when PRs are submitted (eg https://github.com/apache/kafka/pull/74). I'll see if I can get some help from INFRA. As for wiki, I have a couple of minor comments. a. Could we add the following to the wiki? To avoid conflicts, assign a jira to yourself if you plan to work on it. If you are creating a jira and don't plan to work on it, leave the assignee as Unassigned. Good point, done. b. Previously, we mark a jira as Patch Available if there is a patch. Could we reuse that instead of In Progress to be consistent? Also, if a patch needs more work after review, the reviewer will mark the jira back to In Progress. I added this as an additional step for the user because I don't think we can customise the behaviour of the pull request bot (which sets the status to In Progress). It's a bit redundant when a PR is created, so a bit annoying. I'll double-check with INFRA. I also mentioned in the review section about the reviewer changing the JIRA status back to In Progress. Best, Ismael
Re: Review Request 34641: Patch for KAFKA-2214
On May 26, 2015, 7:08 a.m., Michael Noll wrote: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 81 https://reviews.apache.org/r/34641/diff/1/?file=971195#file971195line81 Should we also consider reassignments that are in-progress as errors? The reasoning is that you'd like to do the following in (say) Ansible: Trigger reassignment of partitions, wait until all are completed, and only then continue with the next action. That being said, there are other ways to achieve this in tools like Ansible. For instance, you can trigger reassignents, then repeatedly call `--verify` in a loop and capture its STDOUT, looking for is still in progress and failed. However, this is arguably more error prone because the log messages can change between Kafka versions (and oftentimes such changes are not prominently advertised, so you only notice this once your deployment script breaks). Manikumar Reddy O wrote: Yes, we can consider in-progress as errors. Other option could be returning a different error code. Let us wait for others suggestions/concerns. Gwen Shapira wrote: I wouldn't consider in-progress as error, since in-progress is pretty much the expected behavior here (reassigning takes a while and the tool exits immediately). As you mentioned, the ansible script should include triggering re-assignment and then polling periodically until its completed. (like you would for HDFS rebalance, Oozie job, MR job, etc) Perhaps just commenting on --verify output that this is a public API and should never change would be ok? Gwen Shapira wrote: Sorry, I got confused. You meant - triggering a new re-assignment while the old one is still in progress. I agree this should be an error. (but I still think the ansible script should use --verify, otherwise you risk triggering a new re-assignment accidentally) difeerent exitCode is given for in-progress reassignments. exitCode = 0 for ReassignmentCompleted exitCode = 1 for ReassignmentInProgress exitCode = 2 for ReassignmentFailed - Manikumar Reddy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/#review85154 --- On July 14, 2015, 10:03 a.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated July 14, 2015, 10:03 a.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- Addressing Gwen's comments Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ea345895a52977c25bff57e95e12b8662331d7fe Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
[GitHub] kafka pull request: KAFKA-1595; Remove deprecated and slower scala...
GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/74 KAFKA-1595; Remove deprecated and slower scala JSON parser A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `JsonMappingException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-1595-remove-deprecated-json-parser-jackson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/74.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #74 commit 61f20cc04a89200c28eb77137671235516c81847 Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-20T20:53:54Z Introduce `testJsonParse` Simple test that shows existing behaviour. commit 4ca0feb37e8be2d388b60efacc19bc6788b6 Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:15:02Z KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `JsonMappingException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. commit f401990f13bddbd3d97e05756cf2f1abf367677e Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:23:39Z Minor clean-ups in `Json.encode` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626122#comment-14626122 ] Stefan Miklosovic commented on KAFKA-1835: -- Awesome discussion guys! I love to have it so clear and low level. I didn't know quite clearly what was happening anyway ... As I am sending a message to send to Kafka via new Producer from another place, in case I wrap producer to try-catch and act accordingly if producer fails to fetch the metadata, I can send the response back to the original caller that his request to send data to topic was unsuccessful. However, as I understand that correctly, the fact that producer failed to receive information about the topic when it was sending something from the first time fires new metadata request and any subsequent producer requests should be ok. I do not mind to have initial blocking behaviour at all. However I am not quite sure why it does timeout in the first place. In case topics are up and running and communication from producer to broker is ok, I do not see any reason why it should timeout after 60 secs anyway ... Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626050#comment-14626050 ] ASF GitHub Bot commented on KAFKA-1595: --- GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/74 KAFKA-1595; Remove deprecated and slower scala JSON parser A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `JsonMappingException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-1595-remove-deprecated-json-parser-jackson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/74.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #74 commit 61f20cc04a89200c28eb77137671235516c81847 Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-20T20:53:54Z Introduce `testJsonParse` Simple test that shows existing behaviour. commit 4ca0feb37e8be2d388b60efacc19bc6788b6 Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:15:02Z KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `JsonMappingException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. commit f401990f13bddbd3d97e05756cf2f1abf367677e Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:23:39Z Minor clean-ups in `Json.encode` Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800
[jira] [Updated] (KAFKA-2214) kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet
[ https://issues.apache.org/jira/browse/KAFKA-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2214: --- Attachment: KAFKA-2214_2015-07-14_15:40:49.patch kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet -- Key: KAFKA-2214 URL: https://issues.apache.org/jira/browse/KAFKA-2214 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 0.8.1.1, 0.8.2.0 Reporter: Michael Noll Assignee: Manikumar Reddy Priority: Minor Attachments: KAFKA-2214.patch, KAFKA-2214_2015-07-10_21:56:04.patch, KAFKA-2214_2015-07-13_21:10:58.patch, KAFKA-2214_2015-07-14_15:31:12.patch, KAFKA-2214_2015-07-14_15:40:49.patch h4. Background The admin script {{kafka-reassign-partitions.sh}} should integrate better with automation tools such as Ansible, which rely on scripts adhering to Unix best practices such as appropriate exit codes on success/failure. h4. Current behavior (incorrect) When reassignments are still in progress {{kafka-reassign-partitions.sh}} prints {{ERROR}} messages but returns an exit code of zero, which indicates success. This behavior makes it a bit cumbersome to integrate the script into automation tools such as Ansible. {code} $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify Status of partition reassignment: ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] Reassignment of partition [mytopic,0] completed successfully Reassignment of partition [myothertopic,1] completed successfully Reassignment of partition [myothertopic,3] completed successfully ... $ echo $? 0 # But preferably the exit code in the presence of ERRORs should be, say, 1. {code} h3. How to improve I'd suggest that, using the above as the running example, if there are any {{ERROR}} entries in the output (i.e. if there are any assignments remaining that don't match the desired assignments), then the {{kafka-reassign-partitions.sh}} should return a non-zero exit code. h3. Notes In Kafka 0.8.2 the output is a bit different: The ERROR messages are now phrased differently. Before: {code} ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] {code} Now: {code} Reassignment of partition [mytopic,2] is still in progress {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2214) kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet
[ https://issues.apache.org/jira/browse/KAFKA-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626157#comment-14626157 ] Manikumar Reddy commented on KAFKA-2214: Updated reviewboard https://reviews.apache.org/r/34641/diff/ against branch origin/trunk kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet -- Key: KAFKA-2214 URL: https://issues.apache.org/jira/browse/KAFKA-2214 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 0.8.1.1, 0.8.2.0 Reporter: Michael Noll Assignee: Manikumar Reddy Priority: Minor Attachments: KAFKA-2214.patch, KAFKA-2214_2015-07-10_21:56:04.patch, KAFKA-2214_2015-07-13_21:10:58.patch, KAFKA-2214_2015-07-14_15:31:12.patch, KAFKA-2214_2015-07-14_15:40:49.patch h4. Background The admin script {{kafka-reassign-partitions.sh}} should integrate better with automation tools such as Ansible, which rely on scripts adhering to Unix best practices such as appropriate exit codes on success/failure. h4. Current behavior (incorrect) When reassignments are still in progress {{kafka-reassign-partitions.sh}} prints {{ERROR}} messages but returns an exit code of zero, which indicates success. This behavior makes it a bit cumbersome to integrate the script into automation tools such as Ansible. {code} $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify Status of partition reassignment: ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] Reassignment of partition [mytopic,0] completed successfully Reassignment of partition [myothertopic,1] completed successfully Reassignment of partition [myothertopic,3] completed successfully ... $ echo $? 0 # But preferably the exit code in the presence of ERRORs should be, say, 1. {code} h3. How to improve I'd suggest that, using the above as the running example, if there are any {{ERROR}} entries in the output (i.e. if there are any assignments remaining that don't match the desired assignments), then the {{kafka-reassign-partitions.sh}} should return a non-zero exit code. h3. Notes In Kafka 0.8.2 the output is a bit different: The ERROR messages are now phrased differently. Before: {code} ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] {code} Now: {code} Reassignment of partition [mytopic,2] is still in progress {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2214) kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet
[ https://issues.apache.org/jira/browse/KAFKA-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626148#comment-14626148 ] Manikumar Reddy commented on KAFKA-2214: Updated reviewboard https://reviews.apache.org/r/34641/diff/ against branch origin/trunk kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet -- Key: KAFKA-2214 URL: https://issues.apache.org/jira/browse/KAFKA-2214 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 0.8.1.1, 0.8.2.0 Reporter: Michael Noll Assignee: Manikumar Reddy Priority: Minor Attachments: KAFKA-2214.patch, KAFKA-2214_2015-07-10_21:56:04.patch, KAFKA-2214_2015-07-13_21:10:58.patch, KAFKA-2214_2015-07-14_15:31:12.patch h4. Background The admin script {{kafka-reassign-partitions.sh}} should integrate better with automation tools such as Ansible, which rely on scripts adhering to Unix best practices such as appropriate exit codes on success/failure. h4. Current behavior (incorrect) When reassignments are still in progress {{kafka-reassign-partitions.sh}} prints {{ERROR}} messages but returns an exit code of zero, which indicates success. This behavior makes it a bit cumbersome to integrate the script into automation tools such as Ansible. {code} $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify Status of partition reassignment: ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] Reassignment of partition [mytopic,0] completed successfully Reassignment of partition [myothertopic,1] completed successfully Reassignment of partition [myothertopic,3] completed successfully ... $ echo $? 0 # But preferably the exit code in the presence of ERRORs should be, say, 1. {code} h3. How to improve I'd suggest that, using the above as the running example, if there are any {{ERROR}} entries in the output (i.e. if there are any assignments remaining that don't match the desired assignments), then the {{kafka-reassign-partitions.sh}} should return a non-zero exit code. h3. Notes In Kafka 0.8.2 the output is a bit different: The ERROR messages are now phrased differently. Before: {code} ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] {code} Now: {code} Reassignment of partition [mytopic,2] is still in progress {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2214) kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet
[ https://issues.apache.org/jira/browse/KAFKA-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2214: --- Attachment: KAFKA-2214_2015-07-14_15:31:12.patch kafka-reassign-partitions.sh --verify should return non-zero exit codes when reassignment is not completed yet -- Key: KAFKA-2214 URL: https://issues.apache.org/jira/browse/KAFKA-2214 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 0.8.1.1, 0.8.2.0 Reporter: Michael Noll Assignee: Manikumar Reddy Priority: Minor Attachments: KAFKA-2214.patch, KAFKA-2214_2015-07-10_21:56:04.patch, KAFKA-2214_2015-07-13_21:10:58.patch, KAFKA-2214_2015-07-14_15:31:12.patch h4. Background The admin script {{kafka-reassign-partitions.sh}} should integrate better with automation tools such as Ansible, which rely on scripts adhering to Unix best practices such as appropriate exit codes on success/failure. h4. Current behavior (incorrect) When reassignments are still in progress {{kafka-reassign-partitions.sh}} prints {{ERROR}} messages but returns an exit code of zero, which indicates success. This behavior makes it a bit cumbersome to integrate the script into automation tools such as Ansible. {code} $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify Status of partition reassignment: ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] Reassignment of partition [mytopic,0] completed successfully Reassignment of partition [myothertopic,1] completed successfully Reassignment of partition [myothertopic,3] completed successfully ... $ echo $? 0 # But preferably the exit code in the presence of ERRORs should be, say, 1. {code} h3. How to improve I'd suggest that, using the above as the running example, if there are any {{ERROR}} entries in the output (i.e. if there are any assignments remaining that don't match the desired assignments), then the {{kafka-reassign-partitions.sh}} should return a non-zero exit code. h3. Notes In Kafka 0.8.2 the output is a bit different: The ERROR messages are now phrased differently. Before: {code} ERROR: Assigned replicas (316,324,311) don't match the list of replicas for reassignment (316,324) for partition [mytopic,2] {code} Now: {code} Reassignment of partition [mytopic,2] is still in progress {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2300: Attachment: KAFKA-2300.patch Attaching a preliminary patch in the case anyone is willing to give a hand. As I described before, one problem is that calls like sendRequest can throw an exception and if one is thrown, then the state of the ControllerBrokerRequestBatch object can be left broken (requests are not sent and newBatch calls keep throwing exceptions. The attached patch catches exceptions that calls like sendRequest might throw, cleans the state, and throws an IllegalStateException. Cleaning the state can be problematic if we don't handle the IllegalStateException appropriately. For now, at least in the call path of the topic deletion, I'm suggesting that we make the controller resign, but this could be overkill. If anyone is willing to chime in, I'd appreciate suggestions around the best way of dealing with such a controller in an illegal state. Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Attachments: KAFKA-2300.patch Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) }} {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of it persists in the controller's memory, causing an exception which interrupts the state change triggered by the broker startup. Has anyone seen something like this? Any idea what's happening here? Any information would be greatly appreciated. Thanks, Johnny -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626247#comment-14626247 ] Flavio Junqueira edited comment on KAFKA-2300 at 7/14/15 1:13 PM: -- Attaching a preliminary patch in the case anyone is willing to give a hand. As I described before, one problem is that calls like sendRequest can throw an exception and if one is thrown, then the state of the ControllerBrokerRequestBatch object can be left broken (requests are not sent and newBatch calls keep throwing exceptions). The attached patch catches exceptions that calls like sendRequest might throw, cleans the state, and throws an IllegalStateException. Cleaning the state can be problematic if we don't handle the IllegalStateException appropriately. For now, at least in the call path of the topic deletion, I'm suggesting that we make the controller resign, but this could be overkill. If anyone is willing to chime in, I'd appreciate suggestions around the best way of dealing with such a controller in an illegal state. was (Author: fpj): Attaching a preliminary patch in the case anyone is willing to give a hand. As I described before, one problem is that calls like sendRequest can throw an exception and if one is thrown, then the state of the ControllerBrokerRequestBatch object can be left broken (requests are not sent and newBatch calls keep throwing exceptions. The attached patch catches exceptions that calls like sendRequest might throw, cleans the state, and throws an IllegalStateException. Cleaning the state can be problematic if we don't handle the IllegalStateException appropriately. For now, at least in the call path of the topic deletion, I'm suggesting that we make the controller resign, but this could be overkill. If anyone is willing to chime in, I'd appreciate suggestions around the best way of dealing with such a controller in an illegal state. Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Attachments: KAFKA-2300.patch Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at
Re: Review Request 35867: Patch for KAFKA-1901
On July 14, 2015, 1:59 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/AppInfo.scala, line 27 https://reviews.apache.org/r/35867/diff/3/?file=1004947#file1004947line27 Per the comment in the previous diff, I think this can go now right? i.e., kafka server depends on clients so if you browse mbeans you will see two app-infos registered (under `kafka.server` and `kafka.common`) which is weird. The server will also expose app-info via the clients package since it already uses kafka metrics and the associated jmx reporter. Old AppInfo registration is removed. Now on Kafka Server, we will only see one app-info mbean. On July 14, 2015, 1:59 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java, line 27 https://reviews.apache.org/r/35867/diff/3/?file=1004944#file1004944line27 (For consistency) should we make this 40-char wide as is a standard commit id? Or we can just go with a eight-char or 16-char wide id for both this and the actual commit id. Ok..I am going with 16 char id. On July 14, 2015, 1:59 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java, line 246 https://reviews.apache.org/r/35867/diff/3/?file=1004943#file1004943line246 In https://issues.apache.org/jira/browse/KAFKA-1901?focusedCommentId=14294803page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14294803 I was wondering if we could have a commit fingerprint - i.e., the long value of the most-significant eight bytes of the commit modulo 10k or something like that. This would make is convenient to register as a measurable `KafkaMetric` that people can then use in their deployment monitoring. i.e., instantly look at a graph and say whether all brokers/clients are running the same version or not. Implemented a simple finger print (commitId.hashcode()) mechanism. I felt commit.hashCode() should be sufficient for our requirement. - Manikumar Reddy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35867/#review91574 --- On July 14, 2015, 12:32 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35867/ --- (Updated July 14, 2015, 12:32 p.m.) Review request for kafka. Bugs: KAFKA-1901 https://issues.apache.org/jira/browse/KAFKA-1901 Repository: kafka Description --- Addresing Joel's comments Diffs - build.gradle d86f1a8b25197d53f11e16c54a6854487e175649 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 6b9590c418aedd2727544c5dd23c017b4b72467a clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java 07b1b60d3a9cb1a399a2fe95b87229f64f539f3b clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb core/src/main/scala/kafka/common/AppInfo.scala d642ca555f83c41451d4fcaa5c01a1f86eff0a1c core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/main/scala/kafka/server/KafkaServerStartable.scala 1c1b75b4137a8b233b61739018e9cebcc3a34343 Diff: https://reviews.apache.org/r/35867/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Updated] (KAFKA-1901) Move Kafka version to be generated in code by build (instead of in manifest)
[ https://issues.apache.org/jira/browse/KAFKA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1901: --- Attachment: KAFKA-1901_2015-07-14_17:59:56.patch Move Kafka version to be generated in code by build (instead of in manifest) Key: KAFKA-1901 URL: https://issues.apache.org/jira/browse/KAFKA-1901 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Jason Rosenberg Assignee: Manikumar Reddy Attachments: KAFKA-1901.patch, KAFKA-1901_2015-06-26_13:16:29.patch, KAFKA-1901_2015-07-10_16:42:53.patch, KAFKA-1901_2015-07-14_17:59:56.patch With 0.8.2 (rc2), I've started seeing this warning in the logs of apps deployed to our staging (both server and client): {code} 2015-01-23 00:55:25,273 WARN [async-message-sender-0] common.AppInfo$ - Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException {code} The issues is that in our deployment, apps are deployed with single 'shaded' jars (e.g. using the maven shade plugin). This means the MANIFEST.MF file won't have a kafka version. Instead, suggest the kafka build generate the proper version in code, as part of the build. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 35867: Patch for KAFKA-1901
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35867/ --- (Updated July 14, 2015, 12:32 p.m.) Review request for kafka. Bugs: KAFKA-1901 https://issues.apache.org/jira/browse/KAFKA-1901 Repository: kafka Description (updated) --- Addresing Joel's comments Diffs (updated) - build.gradle d86f1a8b25197d53f11e16c54a6854487e175649 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 6b9590c418aedd2727544c5dd23c017b4b72467a clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java 07b1b60d3a9cb1a399a2fe95b87229f64f539f3b clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb core/src/main/scala/kafka/common/AppInfo.scala d642ca555f83c41451d4fcaa5c01a1f86eff0a1c core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/main/scala/kafka/server/KafkaServerStartable.scala 1c1b75b4137a8b233b61739018e9cebcc3a34343 Diff: https://reviews.apache.org/r/35867/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1901) Move Kafka version to be generated in code by build (instead of in manifest)
[ https://issues.apache.org/jira/browse/KAFKA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626292#comment-14626292 ] Manikumar Reddy commented on KAFKA-1901: Updated reviewboard https://reviews.apache.org/r/35867/diff/ against branch origin/trunk Move Kafka version to be generated in code by build (instead of in manifest) Key: KAFKA-1901 URL: https://issues.apache.org/jira/browse/KAFKA-1901 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Jason Rosenberg Assignee: Manikumar Reddy Attachments: KAFKA-1901.patch, KAFKA-1901_2015-06-26_13:16:29.patch, KAFKA-1901_2015-07-10_16:42:53.patch, KAFKA-1901_2015-07-14_17:59:56.patch With 0.8.2 (rc2), I've started seeing this warning in the logs of apps deployed to our staging (both server and client): {code} 2015-01-23 00:55:25,273 WARN [async-message-sender-0] common.AppInfo$ - Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException {code} The issues is that in our deployment, apps are deployed with single 'shaded' jars (e.g. using the maven shade plugin). This means the MANIFEST.MF file won't have a kafka version. Instead, suggest the kafka build generate the proper version in code, as part of the build. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Build failed in Jenkins: KafkaPreCommit #147
I think there is no JIRA ticket created for ConsumerBounceTest yet. We can create a new one for it. On Tue, Jul 14, 2015 at 4:12 AM, Ismael Juma ism...@juma.me.uk wrote: On Tue, Jul 14, 2015 at 7:01 AM, Gwen Shapira csh...@gmail.com wrote: Looks like ConsumerBounceTest is flaky. Anyone knows if there's a JIRA for it? It seemed to be failing more-or-less at random in the last few builds. I haven't seen one. Another test that is quite flaky for me is `SocketServerTest`. It would be really nice to make the tests less flaky. I see that there's a patch for ServerShutdownTest in KAFKA-1858 (originally reported by you Gwen). Maybe worth reviewing and integrating. Best, Ismael -- -- Guozhang
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626627#comment-14626627 ] Guozhang Wang commented on KAFKA-1835: -- [~becket_qin][~smiklosovic][~ewencp] I think a related issue that have also been discussed is KAFKA-1788 / KAFKA-2142: even if a partition is available when we do the checking at send() and hence append it to the buffer, it could still become unavailable (even forever) when the sender thread try to drain the buffer and send to the brokers. So thinking it a bit more I feel enforcing the producer to only enqueue records when the partition is known cannot really provide any strict guarantees, and we can probably skip the checking and instead use the timeout value to drop buffered messages and trigger callbacks with error as proposed in KAFKA-2142. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2188) JBOD Support
[ https://issues.apache.org/jira/browse/KAFKA-2188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2188: --- Reviewer: Jun Rao (was: Jay Kreps) JBOD Support Key: KAFKA-2188 URL: https://issues.apache.org/jira/browse/KAFKA-2188 Project: Kafka Issue Type: Bug Reporter: Andrii Biletskyi Assignee: Andrii Biletskyi Attachments: KAFKA-2188.patch, KAFKA-2188.patch https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2210: --- Reviewer: Jun Rao KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626663#comment-14626663 ] Parth Brahmbhatt commented on KAFKA-2210: - Updated reviewboard https://reviews.apache.org/r/34492/diff/ against branch origin/trunk KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch, KAFKA-2210_2015-07-14_10:02:19.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Build failed in Jenkins: KafkaPreCommit #147
Looks like there is already KAFKA-2055 for the transient ConsumerBounceTest failure. I'll have a look at it. -Jason On Tue, Jul 14, 2015 at 9:02 AM, Guozhang Wang wangg...@gmail.com wrote: I think there is no JIRA ticket created for ConsumerBounceTest yet. We can create a new one for it. On Tue, Jul 14, 2015 at 4:12 AM, Ismael Juma ism...@juma.me.uk wrote: On Tue, Jul 14, 2015 at 7:01 AM, Gwen Shapira csh...@gmail.com wrote: Looks like ConsumerBounceTest is flaky. Anyone knows if there's a JIRA for it? It seemed to be failing more-or-less at random in the last few builds. I haven't seen one. Another test that is quite flaky for me is `SocketServerTest`. It would be really nice to make the tests less flaky. I see that there's a patch for ServerShutdownTest in KAFKA-1858 (originally reported by you Gwen). Maybe worth reviewing and integrating. Best, Ismael -- -- Guozhang
Re: Build failed in Jenkins: KafkaPreCommit #147
Ohhh ... you are right, and I cannot believe it was created by myself, totally forgot about that. On Tue, Jul 14, 2015 at 9:30 AM, Jason Gustafson ja...@confluent.io wrote: Looks like there is already KAFKA-2055 for the transient ConsumerBounceTest failure. I'll have a look at it. -Jason On Tue, Jul 14, 2015 at 9:02 AM, Guozhang Wang wangg...@gmail.com wrote: I think there is no JIRA ticket created for ConsumerBounceTest yet. We can create a new one for it. On Tue, Jul 14, 2015 at 4:12 AM, Ismael Juma ism...@juma.me.uk wrote: On Tue, Jul 14, 2015 at 7:01 AM, Gwen Shapira csh...@gmail.com wrote: Looks like ConsumerBounceTest is flaky. Anyone knows if there's a JIRA for it? It seemed to be failing more-or-less at random in the last few builds. I haven't seen one. Another test that is quite flaky for me is `SocketServerTest`. It would be really nice to make the tests less flaky. I see that there's a patch for ServerShutdownTest in KAFKA-1858 (originally reported by you Gwen). Maybe worth reviewing and integrating. Best, Ismael -- -- Guozhang -- -- Guozhang
[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2123: --- Reviewer: Guozhang Wang (was: Jay Kreps) Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch, KAFKA-2123_2015-05-29_11:11:05.patch, KAFKA-2123_2015-07-11_17:33:59.patch, KAFKA-2123_2015-07-13_18:45:08.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Attachment: KAFKA-2210_2015-07-14_10:02:19.patch KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch, KAFKA-2210_2015-07-14_10:02:19.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 5:02 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description (updated) --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs (updated) - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Comment Edited] (KAFKA-972) MetadataRequest returns stale list of brokers
[ https://issues.apache.org/jira/browse/KAFKA-972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626807#comment-14626807 ] Ashish K Singh edited comment on KAFKA-972 at 7/14/15 6:22 PM: --- Thanks [~junrao]! was (Author: singhashish): Thanks Jun! MetadataRequest returns stale list of brokers - Key: KAFKA-972 URL: https://issues.apache.org/jira/browse/KAFKA-972 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Vinicius Carvalho Assignee: Ashish K Singh Fix For: 0.8.3 Attachments: BrokerMetadataTest.scala, KAFKA-972.patch, KAFKA-972_2015-06-30_18:42:13.patch, KAFKA-972_2015-07-01_01:36:56.patch, KAFKA-972_2015-07-01_01:42:42.patch, KAFKA-972_2015-07-01_08:06:03.patch, KAFKA-972_2015-07-06_23:07:34.patch, KAFKA-972_2015-07-07_10:42:41.patch, KAFKA-972_2015-07-07_23:24:13.patch When we issue an metadatarequest towards the cluster, the list of brokers is stale. I mean, even when a broker is down, it's returned back to the client. The following are examples of two invocations one with both brokers online and the second with a broker down: { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 0, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 2, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 3, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 4, leader: 0, byteLength: 26 } ], byteLength: 145 } ], responseSize: 200, correlationId: -1000 } { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 0, leader: -1, byteLength: 22 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 2, leader: -1,
[jira] [Commented] (KAFKA-972) MetadataRequest returns stale list of brokers
[ https://issues.apache.org/jira/browse/KAFKA-972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626807#comment-14626807 ] Ashish K Singh commented on KAFKA-972: -- Thanks Jun! MetadataRequest returns stale list of brokers - Key: KAFKA-972 URL: https://issues.apache.org/jira/browse/KAFKA-972 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Vinicius Carvalho Assignee: Ashish K Singh Fix For: 0.8.3 Attachments: BrokerMetadataTest.scala, KAFKA-972.patch, KAFKA-972_2015-06-30_18:42:13.patch, KAFKA-972_2015-07-01_01:36:56.patch, KAFKA-972_2015-07-01_01:42:42.patch, KAFKA-972_2015-07-01_08:06:03.patch, KAFKA-972_2015-07-06_23:07:34.patch, KAFKA-972_2015-07-07_10:42:41.patch, KAFKA-972_2015-07-07_23:24:13.patch When we issue an metadatarequest towards the cluster, the list of brokers is stale. I mean, even when a broker is down, it's returned back to the client. The following are examples of two invocations one with both brokers online and the second with a broker down: { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 0, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 2, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 3, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 4, leader: 0, byteLength: 26 } ], byteLength: 145 } ], responseSize: 200, correlationId: -1000 } { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 0, leader: -1, byteLength: 22 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 2, leader: -1, byteLength: 22 }, { replicas: [
[jira] [Commented] (KAFKA-2205) Generalize TopicConfigManager to handle multiple entity configs
[ https://issues.apache.org/jira/browse/KAFKA-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626814#comment-14626814 ] Aditya Auradkar commented on KAFKA-2205: [~junrao] Thanks! I addressed your remaining comments. Please take a look. Generalize TopicConfigManager to handle multiple entity configs --- Key: KAFKA-2205 URL: https://issues.apache.org/jira/browse/KAFKA-2205 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Labels: quotas Attachments: KAFKA-2205.patch, KAFKA-2205_2015-07-01_18:38:18.patch, KAFKA-2205_2015-07-07_19:12:15.patch, KAFKA-2205_2015-07-14_10:33:47.patch, KAFKA-2205_2015-07-14_10:36:36.patch Acceptance Criteria: - TopicConfigManager should be generalized to handle Topic and Client configs (and any type of config in the future). As described in KIP-21 - Add a ConfigCommand tool to change topic and client configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91638 --- core/src/main/scala/kafka/security/auth/Acl.scala (line 24) https://reviews.apache.org/r/34492/#comment145163 nit: you could swap this line with the previous one and use ''WildcardHost'' instead on a String literal on WildCardPrincipal declaration. - Edward Ribeiro On July 14, 2015, 5:02 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 5:02 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627049#comment-14627049 ] Parth Brahmbhatt commented on KAFKA-2210: - Updated reviewboard https://reviews.apache.org/r/34492/diff/ against branch origin/trunk KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch, KAFKA-2210_2015-07-14_10:02:19.patch, KAFKA-2210_2015-07-14_14:13:19.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Attachment: KAFKA-2210_2015-07-14_14:13:19.patch KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch, KAFKA-2210_2015-07-14_10:02:19.patch, KAFKA-2210_2015-07-14_14:13:19.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Fasterxml/Jackson +1 to that. The scala databinds to case classes are gr8. ~ Joestein On Jul 14, 2015 5:42 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Currently the clients/server mismatch wouldn't be an issue since there are no client-side uses of JSON, right? That said, if Copycat ends up included in Kafka we'll need to provide at least one serializer which would be written in Java and I suspect some people would like JSON to be a candidate for that. I'd personally go with Jackson just because of how widely used it is and so we have one library for both Scala and Java. The use of JSON in the code base isn't terribly complex, so I don't think a specialized API for scala provides much benefit. -Ewen On Mon, Jul 13, 2015 at 2:05 PM, Ismael Juma ism...@juma.me.uk wrote: Hi all, Kafka currently use scala.util.parsing.json.JSON as its json parser and it has a number of issues: * It encourages unsafe casts (returns `Option[Any]`) * It's slow (it relies on parser combinators under the hood) * It's not thread-safe (so external locks are needed to use it in a concurrent environment) * It's deprecated (it should have never been included in the standard library in the first place) KAFKA-1595[1] has been filed to track this issue. I initially proposed a change using spray-json's AST with the jawn parser[2]. Gwen expressed some reservations about the choice (a previous discussion had concluded that Jackson should be used instead) and asked me to raise the issue in the mailing list[3]. In order to have a fair comparison, I implemented the change using Jackson as well[4]. I paste part of the commit message: A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. A comparison between the two approaches as I see it: Similarities: 1. The code for users of the JSON library is similar 2. No third-party dependencies 3. Good performance In favour of using Jackson: 1. Same library for client and broker 2. Widely used In favour of using spray-json and jawn: 1. Simple type class based API is included and it has a number of nice features: 1. Support for parsing into case classes (we don't use this yet, but we could use it to make the code safer and more readable in some cases)[5]. 2. Very little reflection used (only for retrieving case classes field names). 3. Write support (could replace our `Json.encode` method). 2. Less code to maintain (ie we don't need a wrapper to make it nice to use from Scala) 3. No memory overhead from wrapping the Jackson classes (probably not a big deal) I am happy to go either way as both approaches have been implemented and I am torn between the options. What do you think? Best, Ismael [1] https://issues.apache.org/jira/browse/KAFKA-1595 [2] https://github.com/ijuma/kafka/commit/80974afefc00eb6313a7357e7942d5d86ffce84d [3] https://issues.apache.org/jira/browse/KAFKA-1595?focusedCommentId=14512881page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14512881 [4] https://github.com/ijuma/kafka/commit/4ca0feb37e8be2d388b60efacc19bc6788b6 [5] The Scala module for Jackson (which is not being used in the commit above) also supports this, but it uses a reflection-based approach instead of type classes. -- Thanks, Ewen
Re: [VOTE] KIP-26 Add Copycat connector framework for data import/export
+1 (binding) Thanks Ewen for taking on something that the Kafka project has long waited for! On Tue, Jul 14, 2015 at 2:58 PM, Jay Kreps j...@confluent.io wrote: +1 Super excited! -Jay On Tue, Jul 14, 2015 at 2:09 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLMOEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRrUeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen -- Thanks, Neha
[jira] [Commented] (KAFKA-1977) Make logEndOffset available in the Zookeeper consumer
[ https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626900#comment-14626900 ] James Cheng commented on KAFKA-1977: Will maxEndOffset will be exposed in the new Java consumer? Make logEndOffset available in the Zookeeper consumer - Key: KAFKA-1977 URL: https://issues.apache.org/jira/browse/KAFKA-1977 Project: Kafka Issue Type: Improvement Components: core Reporter: Will Funnell Priority: Minor Attachments: Make_logEndOffset_available_in_the_Zookeeper_consumer.patch The requirement is to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps was to expose the high watermark, as maxEndOffset, from the FetchResponse object through to each MessageAndMetadata object in order to be aware when the consumer has reached the end of each partition. The submitted patch achieves this by adding the maxEndOffset to the PartitionTopicInfo, which is updated when a new message arrives in the ConsumerFetcherThread and then exposed in MessageAndMetadata. See here for discussion: http://search-hadoop.com/m/4TaT4TpJy71 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-26 Add Copycat connector framework for data import/export
+1 Super excited! -Jay On Tue, Jul 14, 2015 at 2:09 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLMOEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRrUeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen
Re: [DISCUSS] Json libraries for Kafka
Maybe after the existing scala clients are deprecated. ~ Joestein On Jul 14, 2015 6:04 PM, Jay Kreps j...@confluent.io wrote: Is this going to become a dependency for core and then transitively for the old clients? The current json library is definitely not great, but it does parse json and it's not used in any context where performance is a concern. Because the older clients aren't well modularized, adding core dependencies sucks these up into every user of the clients. This particularly becomes a problem with common libraries since it will turn out we require version X but other code in the same app requires version Y. The new clients fix this issue but not everyone is using them yet. If there is a pressing need maybe we should just do it and people who have problems can just hack their build to exclude the dependency (since the client code won't need it). If not it might be better just to leave it for a bit until we have at least get a couple releases with both the new producer and the new consumer. -Jay On Mon, Jul 13, 2015 at 2:05 PM, Ismael Juma ism...@juma.me.uk wrote: Hi all, Kafka currently use scala.util.parsing.json.JSON as its json parser and it has a number of issues: * It encourages unsafe casts (returns `Option[Any]`) * It's slow (it relies on parser combinators under the hood) * It's not thread-safe (so external locks are needed to use it in a concurrent environment) * It's deprecated (it should have never been included in the standard library in the first place) KAFKA-1595[1] has been filed to track this issue. I initially proposed a change using spray-json's AST with the jawn parser[2]. Gwen expressed some reservations about the choice (a previous discussion had concluded that Jackson should be used instead) and asked me to raise the issue in the mailing list[3]. In order to have a fair comparison, I implemented the change using Jackson as well[4]. I paste part of the commit message: A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. A comparison between the two approaches as I see it: Similarities: 1. The code for users of the JSON library is similar 2. No third-party dependencies 3. Good performance In favour of using Jackson: 1. Same library for client and broker 2. Widely used In favour of using spray-json and jawn: 1. Simple type class based API is included and it has a number of nice features: 1. Support for parsing into case classes (we don't use this yet, but we could use it to make the code safer and more readable in some cases)[5]. 2. Very little reflection used (only for retrieving case classes field names). 3. Write support (could replace our `Json.encode` method). 2. Less code to maintain (ie we don't need a wrapper to make it nice to use from Scala) 3. No memory overhead from wrapping the Jackson classes (probably not a big deal) I am happy to go either way as both approaches have been implemented and I am torn between the options. What do you think? Best, Ismael [1] https://issues.apache.org/jira/browse/KAFKA-1595 [2] https://github.com/ijuma/kafka/commit/80974afefc00eb6313a7357e7942d5d86ffce84d [3] https://issues.apache.org/jira/browse/KAFKA-1595?focusedCommentId=14512881page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14512881 [4] https://github.com/ijuma/kafka/commit/4ca0feb37e8be2d388b60efacc19bc6788b6 [5] The Scala module for Jackson (which is not being used in the commit above) also supports this, but it uses a reflection-based approach instead of type classes.
[jira] [Commented] (KAFKA-2055) ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627041#comment-14627041 ] Guozhang Wang commented on KAFKA-2055: -- [~lvfangmin] Thanks for the patch, I agree this is a common scenario that the test could fail. More generally, during a leader migration, the HW of the partition may go back a little bit on the new leader before it catch up to the old value and move forward. And since we use HW to guard the Fetch / ListOffset requests from clients, it could cause unexpected behaviors like above. As for fixing this test itself, I think adding some timing manner is OK; just that it would better to use the {code} TestUtils.waitUntilTrue(condition: () = Boolean, msg: String, waitTime: Long = 5000L) {code} instead of Thread.sleep(). [~hachikuji] can help reviewing the next patch if you have time, or we can also go ahead and fix it right away. For fixing this issue in a general way, I have created KAFKA-2334 along with a possible solution. ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure Key: KAFKA-2055 URL: https://issues.apache.org/jira/browse/KAFKA-2055 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Attachments: KAFKA-2055.patch {code} kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:976 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:913 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36333: Patch for KAFKA-2123
On July 14, 2015, 6 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java, line 274 https://reviews.apache.org/r/36333/diff/2-3/?file=1009096#file1009096line274 Seems like we're not handling this anymore? sendListOffsetRequest will return this type of error if we don't have metadata, the leader is unknown, the topic partition doesn't exist, or the broker isn't the leader for the partition. I think this is easy to miss because there's a number of layers in this class that don't get tested directly, so there are a lot of scenarios that the unit test probably isn't covering now. Maybe fix up handling of the exception for now and file a jira to follow up with better unit tests, especially if we think this code is finally settling down? Haha, I don't think I'll be able to persuade Guozhang to commit another refactor, so I think this is it. I agree on the need for more unit tests for Fetcher. I'll create a new jira. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91646 --- On July 14, 2015, 8:21 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Re: [VOTE] KIP-26 Add Copycat connector framework for data import/export
+1 (binding) ~ Joe Stein - - - - - - - - - - - - - - - - - - - [image: Logo-Black.jpg] http://www.elodina.net http://www.stealth.ly - - - - - - - - - - - - - - - - - - - On Tue, Jul 14, 2015 at 5:09 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLMOEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRrUeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs (updated) - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91651 --- LGTM overall, just some minor comments below. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 815) https://reviews.apache.org/r/36333/#comment145173 Is this comment accurate? Could a sync commit throw exception when it is no longer retriable on returned Future? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 1071) https://reviews.apache.org/r/36333/#comment145207 nit: not capitalizing the first letter just for comment consistency? clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 175) https://reviews.apache.org/r/36333/#comment145218 I think for in-function comments we will not capitalize comments, same below. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 186) https://reviews.apache.org/r/36333/#comment145216 decapitalize ensure that? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 222) https://reviews.apache.org/r/36333/#comment145215 missing one @param. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 234) https://reviews.apache.org/r/36333/#comment145219 Same here and below regarding comments. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 615) https://reviews.apache.org/r/36333/#comment145226 nit: Move this class right after sendOffsetCommitRequest()? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java (line 41) https://reviews.apache.org/r/36333/#comment145227 Comment: remove all instances of the task in the queue? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java (line 144) https://reviews.apache.org/r/36333/#comment145229 Decapitalize comments, same below. clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java (line 46) https://reviews.apache.org/r/36333/#comment145233 Maybe rename to testSend(...) and below just for naming consistency? clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java (line 286) https://reviews.apache.org/r/36333/#comment145234 Did you check the expected tag works here? I have seen in some old junit versions the test will still pass even if the exception is now thrown. clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java (line 62) https://reviews.apache.org/r/36333/#comment145235 Rename to testRemove? clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java (line 63) https://reviews.apache.org/r/36333/#comment145236 testReset? clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java (line 23) https://reviews.apache.org/r/36333/#comment145237 testCompose... ? Same below. l, - Guozhang Wang On July 14, 2015, 8:21 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION
[jira] [Updated] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Cotton updated KAFKA-2300: -- Attachment: KAFKA-2300-controller-logs.tar.gz Controller logs Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Attachments: KAFKA-2300-controller-logs.tar.gz, KAFKA-2300.patch Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) }} {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of it persists in the controller's memory, causing an exception which interrupts the state change triggered by the broker startup. Has anyone seen something like this? Any idea what's happening here? Any information would be greatly appreciated. Thanks, Johnny -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626987#comment-14626987 ] Jason Gustafson commented on KAFKA-2123: Updated reviewboard https://reviews.apache.org/r/36333/diff/ against branch upstream/trunk Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch, KAFKA-2123_2015-05-29_11:11:05.patch, KAFKA-2123_2015-07-11_17:33:59.patch, KAFKA-2123_2015-07-13_18:45:08.patch, KAFKA-2123_2015-07-14_13:20:25.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-2123: --- Attachment: KAFKA-2123_2015-07-14_13:20:25.patch Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch, KAFKA-2123_2015-05-29_11:11:05.patch, KAFKA-2123_2015-07-11_17:33:59.patch, KAFKA-2123_2015-07-13_18:45:08.patch, KAFKA-2123_2015-07-14_13:20:25.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2334) Prevent HW from going back during leader failover
Guozhang Wang created KAFKA-2334: Summary: Prevent HW from going back during leader failover Key: KAFKA-2334 URL: https://issues.apache.org/jira/browse/KAFKA-2334 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 0.9.0 Consider the following scenario: 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as the follower. 1. A producer keep sending to Kafka with ack=-1. 2. A consumer repeat issuing ListOffset request to Kafka. And the following sequence: 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2. 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO becomes 100) and hold the request in purgatory. 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and returns the 100 messages. 3. B2 append its received message to local log (LEO becomes 100). 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing that B2's LEO has caught up to 100, and hence update its own HW, and satisfying the ProduceRequest in purgatory, and sending the FetchResponse with HW 100 back to B2 ASYNCHRONOUSLY. 5. B1 successfully sends the ProduceResponse to the producer, and then fails, hence the FetchResponse did not reach B2, whose HW remains 0. From the consumer's point of view, it could first see the latest offset of 100 (from B1), and then see the latest offset of 0 (from B2), and then the latest offset gradually catch up to 100. This is because we use HW to guard the ListOffset and Fetch-from-ordinary-consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Currently the clients/server mismatch wouldn't be an issue since there are no client-side uses of JSON, right? That said, if Copycat ends up included in Kafka we'll need to provide at least one serializer which would be written in Java and I suspect some people would like JSON to be a candidate for that. I'd personally go with Jackson just because of how widely used it is and so we have one library for both Scala and Java. The use of JSON in the code base isn't terribly complex, so I don't think a specialized API for scala provides much benefit. -Ewen On Mon, Jul 13, 2015 at 2:05 PM, Ismael Juma ism...@juma.me.uk wrote: Hi all, Kafka currently use scala.util.parsing.json.JSON as its json parser and it has a number of issues: * It encourages unsafe casts (returns `Option[Any]`) * It's slow (it relies on parser combinators under the hood) * It's not thread-safe (so external locks are needed to use it in a concurrent environment) * It's deprecated (it should have never been included in the standard library in the first place) KAFKA-1595[1] has been filed to track this issue. I initially proposed a change using spray-json's AST with the jawn parser[2]. Gwen expressed some reservations about the choice (a previous discussion had concluded that Jackson should be used instead) and asked me to raise the issue in the mailing list[3]. In order to have a fair comparison, I implemented the change using Jackson as well[4]. I paste part of the commit message: A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. A comparison between the two approaches as I see it: Similarities: 1. The code for users of the JSON library is similar 2. No third-party dependencies 3. Good performance In favour of using Jackson: 1. Same library for client and broker 2. Widely used In favour of using spray-json and jawn: 1. Simple type class based API is included and it has a number of nice features: 1. Support for parsing into case classes (we don't use this yet, but we could use it to make the code safer and more readable in some cases)[5]. 2. Very little reflection used (only for retrieving case classes field names). 3. Write support (could replace our `Json.encode` method). 2. Less code to maintain (ie we don't need a wrapper to make it nice to use from Scala) 3. No memory overhead from wrapping the Jackson classes (probably not a big deal) I am happy to go either way as both approaches have been implemented and I am torn between the options. What do you think? Best, Ismael [1] https://issues.apache.org/jira/browse/KAFKA-1595 [2] https://github.com/ijuma/kafka/commit/80974afefc00eb6313a7357e7942d5d86ffce84d [3] https://issues.apache.org/jira/browse/KAFKA-1595?focusedCommentId=14512881page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14512881 [4] https://github.com/ijuma/kafka/commit/4ca0feb37e8be2d388b60efacc19bc6788b6 [5] The Scala module for Jackson (which is not being used in the commit above) also supports this, but it uses a reflection-based approach instead of type classes. -- Thanks, Ewen
[jira] [Updated] (KAFKA-2055) ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2055: - Reviewer: Jason Gustafson ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure Key: KAFKA-2055 URL: https://issues.apache.org/jira/browse/KAFKA-2055 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie Attachments: KAFKA-2055.patch {code} kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:976 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:913 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Is this going to become a dependency for core and then transitively for the old clients? The current json library is definitely not great, but it does parse json and it's not used in any context where performance is a concern. Because the older clients aren't well modularized, adding core dependencies sucks these up into every user of the clients. This particularly becomes a problem with common libraries since it will turn out we require version X but other code in the same app requires version Y. The new clients fix this issue but not everyone is using them yet. If there is a pressing need maybe we should just do it and people who have problems can just hack their build to exclude the dependency (since the client code won't need it). If not it might be better just to leave it for a bit until we have at least get a couple releases with both the new producer and the new consumer. -Jay On Mon, Jul 13, 2015 at 2:05 PM, Ismael Juma ism...@juma.me.uk wrote: Hi all, Kafka currently use scala.util.parsing.json.JSON as its json parser and it has a number of issues: * It encourages unsafe casts (returns `Option[Any]`) * It's slow (it relies on parser combinators under the hood) * It's not thread-safe (so external locks are needed to use it in a concurrent environment) * It's deprecated (it should have never been included in the standard library in the first place) KAFKA-1595[1] has been filed to track this issue. I initially proposed a change using spray-json's AST with the jawn parser[2]. Gwen expressed some reservations about the choice (a previous discussion had concluded that Jackson should be used instead) and asked me to raise the issue in the mailing list[3]. In order to have a fair comparison, I implemented the change using Jackson as well[4]. I paste part of the commit message: A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. A comparison between the two approaches as I see it: Similarities: 1. The code for users of the JSON library is similar 2. No third-party dependencies 3. Good performance In favour of using Jackson: 1. Same library for client and broker 2. Widely used In favour of using spray-json and jawn: 1. Simple type class based API is included and it has a number of nice features: 1. Support for parsing into case classes (we don't use this yet, but we could use it to make the code safer and more readable in some cases)[5]. 2. Very little reflection used (only for retrieving case classes field names). 3. Write support (could replace our `Json.encode` method). 2. Less code to maintain (ie we don't need a wrapper to make it nice to use from Scala) 3. No memory overhead from wrapping the Jackson classes (probably not a big deal) I am happy to go either way as both approaches have been implemented and I am torn between the options. What do you think? Best, Ismael [1] https://issues.apache.org/jira/browse/KAFKA-1595 [2] https://github.com/ijuma/kafka/commit/80974afefc00eb6313a7357e7942d5d86ffce84d [3] https://issues.apache.org/jira/browse/KAFKA-1595?focusedCommentId=14512881page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14512881 [4] https://github.com/ijuma/kafka/commit/4ca0feb37e8be2d388b60efacc19bc6788b6 [5] The Scala module for Jackson (which is not being used in the commit above) also supports this, but it uses a reflection-based approach instead of type classes.
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627018#comment-14627018 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- Hey [~bkirwi], this looks interesting. Since it's a pretty important user-facing change, it'd be good to convert this into a KIP, send it to the mailing list, and get broader feedback on it. It looks like you've already got a lot of what you'd need here. The wip patch is also nice since it'll help people understand the impact. One thing you might want to add is a bit of discussion of other alternatives since it'll come up in the discussion anyway. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2334) Prevent HW from going back during leader failover
[ https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627040#comment-14627040 ] Guozhang Wang commented on KAFKA-2334: -- One possible solution to this issue is to let the new leader only become available (i.e. start accepting Produce / Fetch requests for the partition) after its HW caught up with its LEO. This will likely increase the unavailability latency a bit, in practice it should not cause much performance implication since most of the time its HW == LEO, and even not it will quickly catch up. The tricky part is how to implement it without introducing too much logic complexity on the broker side. Prevent HW from going back during leader failover -- Key: KAFKA-2334 URL: https://issues.apache.org/jira/browse/KAFKA-2334 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 0.9.0 Consider the following scenario: 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as the follower. 1. A producer keep sending to Kafka with ack=-1. 2. A consumer repeat issuing ListOffset request to Kafka. And the following sequence: 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2. 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO becomes 100) and hold the request in purgatory. 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and returns the 100 messages. 3. B2 append its received message to local log (LEO becomes 100). 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing that B2's LEO has caught up to 100, and hence update its own HW, and satisfying the ProduceRequest in purgatory, and sending the FetchResponse with HW 100 back to B2 ASYNCHRONOUSLY. 5. B1 successfully sends the ProduceResponse to the producer, and then fails, hence the FetchResponse did not reach B2, whose HW remains 0. From the consumer's point of view, it could first see the latest offset of 100 (from B1), and then see the latest offset of 0 (from B2), and then the latest offset gradually catch up to 100. This is because we use HW to guard the ListOffset and Fetch-from-ordinary-consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[VOTE] KIP-26 Add Copycat connector framework for data import/export
Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLMOEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRrUeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627184#comment-14627184 ] Jiangjie Qin commented on KAFKA-1835: - We discussed this in today's KIP hangout. Just summarize the discussion. In producer, we have the following methods that can potentially block: send() partitionsFor() According to KIP-19, all the methods will be using max.block.ms to bound the blocking time. But this will cause user who are setting max.block.ms=0 to receive exception on the first call on those methods because there was no metadata available when the methods are invoked for the first time. While there is a workaround for user to handle this, it is not an ideal API. For the improvement, there are several following use cases to consider: 1. User can tolerate blocking. 2. User don't want to block on send(), but can accept blocking on partitionsFor(). 3. User don't want to block on either send() or partitionsFor() Discussed within LinkedIn, we found that letting partitionsFor() to return a future might be good solution to address all the requirements. For (1), user can set max.block.ms to be non-zero and just do normal send. For (2), user can set max.block.ms=0, call partitonsFor().get() followed by send() For (3), user can set max.block.ms=0, call partitonsFor() followed by send() and handle all the exceptions on their own. So with this proposal, max.block.ms will be only used by send() Similar proposal applies to KAFKA-2275. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627186#comment-14627186 ] Onur Karaman commented on KAFKA-2275: - Currently, the consumer has the following API: public ListPartitionInfo partitionsFor(String topic); As an alternative to creating a listTopics API, we can just replace the existing partitionsFor with: public FutureMapString, ListPartitionInfo partitionsFor(String... topics); The map's key is the topic and the values are the PartitionInfos for that topic. Passing in no topics would get all the PartitionInfo for all topics. The listTopics API anyway needs to issue an empty metadata request which returns everything so a two-step approach seems redundant. Having the same API in the producer may help address the concerns in KAFKA-1835. We considered other alternatives like ripping out partitionsFor from both KafkaConsumer and KafkaProducer into its own admin client. That admin client would only contain orchestration or cluster state APIs, but this wasn't really fleshed out and the above solution returning the Future Map seems to work for now. Add a ListTopics() API to the new consumer -- Key: KAFKA-2275 URL: https://issues.apache.org/jira/browse/KAFKA-2275 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 With regex subscription like {code} consumer.subscribe(topic*) {code} The partition assignment is automatically done at the Kafka side, while there are some use cases where consumers want regex subscriptions but not Kafka-side partition assignment, rather with their own specific partition assignment. With ListTopics() they can periodically check for topic list changes and specifically subscribe to the partitions of the new topics. For implementation, it involves sending a TopicMetadataRequest to a random broker and parse the response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91675 --- core/src/main/scala/kafka/security/auth/PermissionType.scala (line 40) https://reviews.apache.org/r/34492/#comment145243 Same comment of Operation.scala also applies here. In addition, the return is redundant, right? - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91679 --- core/src/main/scala/kafka/security/auth/Acl.scala (line 110) https://reviews.apache.org/r/34492/#comment145246 As the values are fixed you could have written toMap() as below so that we can save ourselves from creating a mutable Map just to convert it to an immutable Map at the end: def toMap() : Map[String, Any] = { Map(Acl.PrincipalKey - principals.map(principal = principal.toString), Acl.PermissionTypeKey - permissionType.name), Acl.OperationKey - operations.map(operation = operation.name), Acl.HostsKey - hosts) } - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627279#comment-14627279 ] Guozhang Wang commented on KAFKA-1835: -- Thanks [~becket_qin] for the summary. Some follow-up comments regarding the new API: I think case (3) is only for users who are willing to drop messages if topic metadata is not present, and hence calling partitionsFor to get the partitions info for this topic, but if it is not present, I will just drop the messages and move forward. To cover this case the following pattern: {code} partitionsFor(topic); send(new Record(topic, key, value)) {code} is a little bit awkward to me, because since the user does not care if the topic metadata is really present and is willing to drop messages if not, it makes little sense to call partitionsFor but checks nothing on return values before send anyways. probably due to its function name. In addition, the behavior of partitionsFor(empty-list) as proposed in KAFKA-2275 is also not clear. Personally I feel it is more intuitive for the user's point of view to define the following: 1. max.block.ms only controls send() calls, which, depending on its value == 0 or not, throw exceptions immediately or wait on metadata refresh if the topic metadata is not available. 2. partitionsFor(single-topic) is blocking UNTIL the topic metadata is valid, this function will ever be called by users of case (1) and (2) only. * If we decide to let max.block.ms only controls send() calls, we may consider not making it a config but rather modify the API to send(record, timeout, callback). 3. for KAFKA-2275 on consumers, still add the listTopics(timeout) API, which is blocked for at most the timeout value. Consumer's partitionsFor() API will only ever be called by users who want to consume specific topics and also have customized partitions management and hence should tolerate blocking as well (i.e. you cannot really be unblocking at all if you want to consume specific topic-partitions because you do not know when these partitions will be available). Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Hi all, my pull request here: https://github.com/apache/kafka/pull/70/files#diff-59f3fe36571d1eee9f923df927a643eb would introduce a client-side dependency on the json-simple package. It is only used in a tool (VerifiableProducer.java), but it sounds like I should probably use Jackson instead? Feel free to make comments on the pull request itself if this is too tangential to the discussion. Thanks, Geoff On Tue, Jul 14, 2015 at 5:23 PM, Jay Kreps j...@confluent.io wrote: Ah, makes sense. Yes that addresses my concerns. -Jay On Tue, Jul 14, 2015 at 5:19 PM, Ismael Juma ism...@juma.me.uk wrote: Hi Jay, Comments inline. On Tue, Jul 14, 2015 at 11:04 PM, Jay Kreps j...@confluent.io wrote: Is this going to become a dependency for core and then transitively for the old clients? That's right. The current json library is definitely not great, but it does parse json and it's not used in any context where performance is a concern. The issue seemed to indicate that the blocking and slow performance were causing issues: https://issues.apache.org/jira/browse/KAFKA-1595 Because the older clients aren't well modularized, adding core dependencies sucks these up into every user of the clients. This particularly becomes a problem with common libraries since it will turn out we require version X but other code in the same app requires version Y. Yes, I understand. Still, if we use Jackson and only use methods that existed in 2.0, then it's unlikely to cause issues. I checked it with Jackson's author: https://twitter.com/ijuma/status/621106341503991808. The reasoning is as follows: - Jackson 1 and Jackson 2 use different packages, so they co-exist peacefully. - Jackson 2.x releases are backwards compatible so clients are free to choose whichever version they want. If they don't pick a version, then the highest version among the dependencies will be chosen. - It is possible that bugs in an untested release would cause issues, but that could affect the existing JSON parser too (clients may use different Scala versions). Limiting ourselves to methods from Jackson 2.0 is not as hard as it sounds because the code only interacts with our thin wrapper of Jackson, all the code that deals directly with Jackson is isolated. The new clients fix this issue but not everyone is using them yet. If there is a pressing need maybe we should just do it and people who have problems can just hack their build to exclude the dependency (since the client code won't need it). If not it might be better just to leave it for a bit until we have at least get a couple releases with both the new producer and the new consumer. Hacking the builds to exclude the transitive dependency would be a last resort, but not needed in most (hopefully all) cases. Does this make it less problematic in your view? If people are concerned about this, we can delay it, of course. A bit of a shame though, as this change improves performance, makes the code more readable and makes it safer. Ismael
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627190#comment-14627190 ] Joel Koshy commented on KAFKA-1835: --- bq. Similar proposal applies to KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275?focusedCommentId=14627186page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14627186 Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
On Tue, Jul 14, 2015 at 10:46 PM, Joe Stein joe.st...@stealth.ly wrote: Fasterxml/Jackson +1 to that. The scala databinds to case classes are gr8. To be clear, case classes support would require the Scala module for Jackson and the Scala versions headache that goes with it (2.9 support is likely to be an issue, for example). Also, spray-json supports case classes mapping and does it in a safer way (compile-time errors if the case class contains fields that can't be mapped). Ismael
Re: [DISCUSS] Json libraries for Kafka
Hi Jay, Comments inline. On Tue, Jul 14, 2015 at 11:04 PM, Jay Kreps j...@confluent.io wrote: Is this going to become a dependency for core and then transitively for the old clients? That's right. The current json library is definitely not great, but it does parse json and it's not used in any context where performance is a concern. The issue seemed to indicate that the blocking and slow performance were causing issues: https://issues.apache.org/jira/browse/KAFKA-1595 Because the older clients aren't well modularized, adding core dependencies sucks these up into every user of the clients. This particularly becomes a problem with common libraries since it will turn out we require version X but other code in the same app requires version Y. Yes, I understand. Still, if we use Jackson and only use methods that existed in 2.0, then it's unlikely to cause issues. I checked it with Jackson's author: https://twitter.com/ijuma/status/621106341503991808. The reasoning is as follows: - Jackson 1 and Jackson 2 use different packages, so they co-exist peacefully. - Jackson 2.x releases are backwards compatible so clients are free to choose whichever version they want. If they don't pick a version, then the highest version among the dependencies will be chosen. - It is possible that bugs in an untested release would cause issues, but that could affect the existing JSON parser too (clients may use different Scala versions). Limiting ourselves to methods from Jackson 2.0 is not as hard as it sounds because the code only interacts with our thin wrapper of Jackson, all the code that deals directly with Jackson is isolated. The new clients fix this issue but not everyone is using them yet. If there is a pressing need maybe we should just do it and people who have problems can just hack their build to exclude the dependency (since the client code won't need it). If not it might be better just to leave it for a bit until we have at least get a couple releases with both the new producer and the new consumer. Hacking the builds to exclude the transitive dependency would be a last resort, but not needed in most (hopefully all) cases. Does this make it less problematic in your view? If people are concerned about this, we can delay it, of course. A bit of a shame though, as this change improves performance, makes the code more readable and makes it safer. Ismael
[GitHub] kafka pull request: KAFKA-2145: Add a log config so users can defi...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/kafka/pull/77 KAFKA-2145: Add a log config so users can define topic owners. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/kafka KAFKA-2145 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/77.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #77 commit de9c4efac53b52923bb2002536b4a2a7725541e9 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Date: 2015-07-15T00:54:40Z KAFKA-2145: Add a log config so users can define topic owners. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [VOTE] KIP-26 Add Copycat connector framework for data import/export
+1. This was long due! On 7/14/15, 4:42 PM, Guozhang Wang wangg...@gmail.com wrote: +1. Thanks Ewen!! On Tue, Jul 14, 2015 at 3:01 PM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Thanks Ewen for taking on something that the Kafka project has long waited for! On Tue, Jul 14, 2015 at 2:58 PM, Jay Kreps j...@confluent.io wrote: +1 Super excited! -Jay On Tue, Jul 14, 2015 at 2:09 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLM OEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRr UeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen -- Thanks, Neha -- -- Guozhang
[GitHub] kafka pull request: KAFKA-1595; Remove deprecated and slower scala...
Github user ijuma closed the pull request at: https://github.com/apache/kafka/pull/74 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627269#comment-14627269 ] ASF GitHub Bot commented on KAFKA-1595: --- Github user ijuma closed the pull request at: https://github.com/apache/kafka/pull/74 Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91682 --- core/src/main/scala/kafka/security/auth/ResourceType.scala (line 45) https://reviews.apache.org/r/34492/#comment145249 Same comment as Permission, and PermissionType, plus the 'return' is redundant, no? - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-2092) New partitioning for better load balancing
[ https://issues.apache.org/jira/browse/KAFKA-2092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627325#comment-14627325 ] Jason Gustafson commented on KAFKA-2092: [~azaroth], thanks for the update. I think the only concern is that the use case is a little complex. As you say, it doesn't make much sense to use this partitioner without an intermediate round of processing. It may be better to keep the partitioning strategies bundled with Kafka more basic. I wonder if this might be a better fit in Samza where it might be able to be leveraged directly? New partitioning for better load balancing -- Key: KAFKA-2092 URL: https://issues.apache.org/jira/browse/KAFKA-2092 Project: Kafka Issue Type: Improvement Components: producer Reporter: Gianmarco De Francisci Morales Assignee: Jun Rao Attachments: KAFKA-2092-v1.patch, KAFKA-2092-v2.patch We have recently studied the problem of load balancing in distributed stream processing systems such as Samza [1]. In particular, we focused on what happens when the key distribution of the stream is skewed when using key grouping. We developed a new stream partitioning scheme (which we call Partial Key Grouping). It achieves better load balancing than hashing while being more scalable than round robin in terms of memory. In the paper we show a number of mining algorithms that are easy to implement with partial key grouping, and whose performance can benefit from it. We think that it might also be useful for a larger class of algorithms. PKG has already been integrated in Storm [2], and I would like to be able to use it in Samza as well. As far as I understand, Kafka producers are the ones that decide how to partition the stream (or Kafka topic). I do not have experience with Kafka, however partial key grouping is very easy to implement: it requires just a few lines of code in Java when implemented as a custom grouping in Storm [3]. I believe it should be very easy to integrate. For all these reasons, I believe it will be a nice addition to Kafka/Samza. If the community thinks it's a good idea, I will be happy to offer support in the porting. References: [1] https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf [2] https://issues.apache.org/jira/browse/STORM-632 [3] https://github.com/gdfm/partial-key-grouping -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Yeah we really can't add any dependencies for that client jar. Maybe we could move the perf test to the tools jar though? -Jay On Tue, Jul 14, 2015 at 6:01 PM, Geoffrey Anderson ge...@confluent.io wrote: Hi all, my pull request here: https://github.com/apache/kafka/pull/70/files#diff-59f3fe36571d1eee9f923df927a643eb would introduce a client-side dependency on the json-simple package. It is only used in a tool (VerifiableProducer.java), but it sounds like I should probably use Jackson instead? Feel free to make comments on the pull request itself if this is too tangential to the discussion. Thanks, Geoff On Tue, Jul 14, 2015 at 5:23 PM, Jay Kreps j...@confluent.io wrote: Ah, makes sense. Yes that addresses my concerns. -Jay On Tue, Jul 14, 2015 at 5:19 PM, Ismael Juma ism...@juma.me.uk wrote: Hi Jay, Comments inline. On Tue, Jul 14, 2015 at 11:04 PM, Jay Kreps j...@confluent.io wrote: Is this going to become a dependency for core and then transitively for the old clients? That's right. The current json library is definitely not great, but it does parse json and it's not used in any context where performance is a concern. The issue seemed to indicate that the blocking and slow performance were causing issues: https://issues.apache.org/jira/browse/KAFKA-1595 Because the older clients aren't well modularized, adding core dependencies sucks these up into every user of the clients. This particularly becomes a problem with common libraries since it will turn out we require version X but other code in the same app requires version Y. Yes, I understand. Still, if we use Jackson and only use methods that existed in 2.0, then it's unlikely to cause issues. I checked it with Jackson's author: https://twitter.com/ijuma/status/621106341503991808. The reasoning is as follows: - Jackson 1 and Jackson 2 use different packages, so they co-exist peacefully. - Jackson 2.x releases are backwards compatible so clients are free to choose whichever version they want. If they don't pick a version, then the highest version among the dependencies will be chosen. - It is possible that bugs in an untested release would cause issues, but that could affect the existing JSON parser too (clients may use different Scala versions). Limiting ourselves to methods from Jackson 2.0 is not as hard as it sounds because the code only interacts with our thin wrapper of Jackson, all the code that deals directly with Jackson is isolated. The new clients fix this issue but not everyone is using them yet. If there is a pressing need maybe we should just do it and people who have problems can just hack their build to exclude the dependency (since the client code won't need it). If not it might be better just to leave it for a bit until we have at least get a couple releases with both the new producer and the new consumer. Hacking the builds to exclude the transitive dependency would be a last resort, but not needed in most (hopefully all) cases. Does this make it less problematic in your view? If people are concerned about this, we can delay it, of course. A bit of a shame though, as this change improves performance, makes the code more readable and makes it safer. Ismael
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627375#comment-14627375 ] Joel Koshy commented on KAFKA-1835: --- bq. I think case (3) is only for users who are willing to drop messages if topic metadata is not present I'm not sure I completely agree with the above - there are applications that care about having completely non-blocking APIs - as evidenced by plenty of emails on the list. However, that does not mean such applications are willing to drop messages if metadata is not present at the time of the call. This is the case for event-driven programming (think node-js style) where the application can go do something else in its event processing loop if a library call returns a future that isn't ready at that specific moment. bq. the behavior of partitionsFor(empty-list) as proposed in KAFKA-2275 is also not clear Agreed that it can be unclear. i.e., we should probably still have a listTopics API. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627383#comment-14627383 ] Jason Gustafson commented on KAFKA-2123: Updated reviewboard https://reviews.apache.org/r/36333/diff/ against branch upstream/trunk Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch, KAFKA-2123_2015-05-29_11:11:05.patch, KAFKA-2123_2015-07-11_17:33:59.patch, KAFKA-2123_2015-07-13_18:45:08.patch, KAFKA-2123_2015-07-14_13:20:25.patch, KAFKA-2123_2015-07-14_18:21:38.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627395#comment-14627395 ] Jay Kreps commented on KAFKA-1835: -- Changing the return type of a method is a hard compatibility break, though, right? It will be impossible for code to work with both prior and future versions. From my point of view that kind of thing is a non-starter--imagine trying to role that out in a large org. [~jjkoshy] I agree with your use case, but I think [~guozhang] is right that it does seem more natural for people who want to guarantee non-blocking usage to set max.block.ms=0 and have send throw an exception even in that case, right? If you get the exception you would go back into your event loop and do your other stuff. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91672 --- core/src/main/scala/kafka/security/auth/Acl.scala (line 24) https://reviews.apache.org/r/34492/#comment145239 nit: what about switch this lines 23 and 24 and then use WildCardHost as replacement for the second literal parameter of new KafkaPrincipal? - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627263#comment-14627263 ] Mayuresh Gharat commented on KAFKA-1835: Either way user will have to block or handle exceptions. Returning a future as Becket said looks good. But does that mean we tell the user that he should not do a send() until the future has returned? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Json libraries for Kafka
Ewen, On Tue, Jul 14, 2015 at 10:41 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Currently the clients/server mismatch wouldn't be an issue since there are no client-side uses of JSON, right? That said, if Copycat ends up included in Kafka we'll need to provide at least one serializer which would be written in Java and I suspect some people would like JSON to be a candidate for that. That's right, I was told that the discussion regarding a JSON library for the client is for a future need, but something to take into account. Ismael
Re: [CFP] Developer showcase at Strata NYC
You (or someone working on them) should submit the projects :) On Tue, Jul 14, 2015 at 4:27 PM, Neha Narkhede n...@confluent.io wrote: All of the ones you've mentioned there sound great. I'd want to see copycat there. On Tue, Jul 14, 2015 at 4:22 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka fans, Strata NYC is looking for emerging, open source, big data projects to highlight in the developer showcase: http://strataconf.com/big-data-conference-ny-2015/public/cfp/409 Depending on how you look at it, it is either science faire for grownups or free conference booth for open source companies. Apache Kafka itself no longer fits the criteria of emerging (yay!), but there are a lot of Kafka related projects out there that will fit. Things like burrow, yahoo's management console, schema repository, rest proxy, ducktape, etc. So, if you have a pet project that you want to promote and you want the best platform possible to do it on - please send it in! We want lots of Kafka related projects in there :) Gwen -- Thanks, Neha
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
On Tue, Jul 14, 2015 at 6:15 PM, Jun Rao j...@confluent.io wrote: I made a couple of changes to the new Jenkins job. Could you try again? It's still not working, unfortunately. It may or may not be related to: https://blogs.apache.org/infra/entry/mirroring_to_github_issues For b, if we can't easily change the behavior of pull request bot, we can also just flip back and forth btw In Progress and Open. It turns out that the ASF bot doesn't change the status at all. Spark is using a different mechanism to cause the status change (via a `sparkuser`). The JIRA ticket status will have to be changed manually for now. I have updated the instructions accordingly. I was told in the INFRA channel to file a JIRA ticket and they would see if auto transition listeners can be used to automate this (no guarantees though). Ismael
[jira] [Work started] (KAFKA-2145) An option to add topic owners.
[ https://issues.apache.org/jira/browse/KAFKA-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2145 started by Parth Brahmbhatt. --- An option to add topic owners. --- Key: KAFKA-2145 URL: https://issues.apache.org/jira/browse/KAFKA-2145 Project: Kafka Issue Type: Improvement Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt We need to expose a way so users can identify users/groups that share ownership of topic. We discussed adding this as part of https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be simpler to add owner as a logconfig. The owner field can be used for auditing and also by authorization layer to grant access without having to explicitly configure acls. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2123) Make new consumer offset commit API use callback + future
[ https://issues.apache.org/jira/browse/KAFKA-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-2123: --- Attachment: KAFKA-2123_2015-07-14_18:21:38.patch Make new consumer offset commit API use callback + future - Key: KAFKA-2123 URL: https://issues.apache.org/jira/browse/KAFKA-2123 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-2123.patch, KAFKA-2123.patch, KAFKA-2123_2015-04-30_11:23:05.patch, KAFKA-2123_2015-05-01_19:33:19.patch, KAFKA-2123_2015-05-04_09:39:50.patch, KAFKA-2123_2015-05-04_22:51:48.patch, KAFKA-2123_2015-05-29_11:11:05.patch, KAFKA-2123_2015-07-11_17:33:59.patch, KAFKA-2123_2015-07-13_18:45:08.patch, KAFKA-2123_2015-07-14_13:20:25.patch, KAFKA-2123_2015-07-14_18:21:38.patch The current version of the offset commit API in the new consumer is void commit(offsets, commit type) where the commit type is either sync or async. This means you need to use sync if you ever want confirmation that the commit succeeded. Some applications will want to use asynchronous offset commit, but be able to tell when the commit completes. This is basically the same problem that had to be fixed going from old consumer - new consumer and I'd suggest the same fix using a callback + future combination. The new API would be FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); where ConsumerCommitCallback contains a single method: public void onCompletion(Exception exception); We can provide shorthand variants of commit() for eliding the different arguments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 15, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure KAFKA-2123; fix comment issues Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91671 --- core/src/main/scala/kafka/security/auth/Operation.scala (line 35) https://reviews.apache.org/r/34492/#comment145238 Scala's match is a powerful mechanism but using it to decode as below seems boilterplate-ish. Why not use something like: def fromString(operation: String): Operation = { val op = values().filter(_.name.equalsIgnoreCase(operation)).headOption op match { Some(x) = x } } or even: def fromString(operation: String): Operation = { val Some(op) = values().filter(_.name.equalsIgnoreCase(operation)).headOption op } - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review91676 --- core/src/main/scala/kafka/security/auth/Resource.scala (line 25) https://reviews.apache.org/r/34492/#comment145244 In KafkaPrincipal you split like: val arr: Array[String] = str.split(Separator, 2) //only split in two parts But here you just call split without a second parameter. Choose one way and uniform the use. :) - Edward Ribeiro On July 14, 2015, 9:13 p.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 14, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Moving PermissionType to trait instead of enum. Following the convention for defining constants. Adding authorizer.config.path back. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[CFP] Developer showcase at Strata NYC
Hi Kafka fans, Strata NYC is looking for emerging, open source, big data projects to highlight in the developer showcase: http://strataconf.com/big-data-conference-ny-2015/public/cfp/409 Depending on how you look at it, it is either science faire for grownups or free conference booth for open source companies. Apache Kafka itself no longer fits the criteria of emerging (yay!), but there are a lot of Kafka related projects out there that will fit. Things like burrow, yahoo's management console, schema repository, rest proxy, ducktape, etc. So, if you have a pet project that you want to promote and you want the best platform possible to do it on - please send it in! We want lots of Kafka related projects in there :) Gwen
[jira] [Commented] (KAFKA-2145) An option to add topic owners.
[ https://issues.apache.org/jira/browse/KAFKA-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627359#comment-14627359 ] ASF GitHub Bot commented on KAFKA-2145: --- GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/kafka/pull/77 KAFKA-2145: Add a log config so users can define topic owners. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/kafka KAFKA-2145 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/77.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #77 commit de9c4efac53b52923bb2002536b4a2a7725541e9 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Date: 2015-07-15T00:54:40Z KAFKA-2145: Add a log config so users can define topic owners. An option to add topic owners. --- Key: KAFKA-2145 URL: https://issues.apache.org/jira/browse/KAFKA-2145 Project: Kafka Issue Type: Improvement Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt We need to expose a way so users can identify users/groups that share ownership of topic. We discussed adding this as part of https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be simpler to add owner as a logconfig. The owner field can be used for auditing and also by authorization layer to grant access without having to explicitly configure acls. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Request to be added to the contributor list
Added. Thanks, Jun On Tue, Jul 14, 2015 at 11:58 AM, Aravind Selvan aravind...@gmail.com wrote: Hi, Please add me to the contributor list. Thanks. ~Aravind Selvan
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627400#comment-14627400 ] Guozhang Wang commented on KAFKA-1835: -- Joel, I think you are referring to some pattern like: {code} init() { Future.. future = partitionsFor(topic); } handle(req) { if (future.ready()) // send this message related to req and all other buffered ones else // buffer this event for sending } {code} for event-driven programming right? But I think in this case users would usually not bother checking if the topic metadata is available or not through partitionsFor since they are not in a hurry sending the messages anyways, but rather try-send-otherwise-do-sth like: {code} handle(req) { try { producer.send(req + previously-buffered messages) } catch { // not successful, either drop it or buffer it for next send call } } {code} I guess what I am trying to argue here is that for users who really want complete non-blocking operations it makes little difference for them to hold a Future object at hand and check periodically than just handle exceptions up-front. The same argument stands for buffer-exhausted as well: users usually do not prefer to check a variable indicating if the buffer is full or not every time they want to send something without blocking, but rather configure it to let producers throw exceptions directly and handle them. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627460#comment-14627460 ] Ben Kirwin commented on KAFKA-2260: --- Hi [~ewencp] -- thanks for the interest! I'd be glad to work this up into a KIP, but it looks like I don't have the permissions to create a wiki page... could you set that up? Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627292#comment-14627292 ] Edward Ribeiro commented on KAFKA-2210: --- Hi [~parth.brahmbhatt], left some minor comments on the review board. See if it makes sense. Cheers! KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch, KAFKA-2210_2015-07-14_10:02:19.patch, KAFKA-2210_2015-07-14_14:13:19.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-26 Add Copycat connector framework for data import/export
+1. Thanks Ewen!! On Tue, Jul 14, 2015 at 3:01 PM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Thanks Ewen for taking on something that the Kafka project has long waited for! On Tue, Jul 14, 2015 at 2:58 PM, Jay Kreps j...@confluent.io wrote: +1 Super excited! -Jay On Tue, Jul 14, 2015 at 2:09 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi all, Let's start a vote on KIP-26: Add Copycat connector framework for data import/export For reference, here's the wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 And the mailing list thread (split across two months): http://mail-archives.apache.org/mod_mbox/kafka-dev/201506.mbox/%3CCAE1jLMOEJjnorFK5CtR3g-n%3Dm_AkrFsYeccsB4QimTRfGBrAGQ%40mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/kafka-dev/201507.mbox/%3CCAHwHRrUeNh%2BnCHwCTUCrcipHM3Po0ECUysO%2B%3DX3nwUeOGrcgdw%40mail.gmail.com%3E Just to clarify since this is a bit different from the KIPs voted on so far, the KIP just covers including Copycat in Kafka (rather than having it as a separate project). While the KIP aimed to be clear about the exact scope, the details require further discussion. The aim is to include some connectors as well, at a minimum for demonstration purposes, but the expectation is that connector development will, by necessity, be federated. I'll kick it off with a +1 (non-binding). -- Thanks, Ewen -- Thanks, Neha -- -- Guozhang
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627474#comment-14627474 ] Joel Koshy commented on KAFKA-1835: --- My earlier comment was more to clarify the reasoning. i.e., the statement that users that want non-blocking behavior are okay with losing messages which isn't necessarily true. I'm actually not opposed to the approach of going with max.block.ms == 0 for the non-blocking case and throwing an exception on metadata not being ready. I thought we even covered that option at the hangout (but Becket wasn't terribly excited about exposing any metadata-related exception in the first send to the user). From my p.o.v I don't think that's an issue since users cannot really escape from knowing about partitions and thus metadata. This (future API) was just an alternate approach that came out of a conversation with [~onurkaraman] in KAFKA-2275. We have a need for list-topics (for some use cases such as ETL). We could just go with a blocking API there or a config-driven (max.block.ms type) approach as we are considering currently in the producer. Now that we are working through the change in the consumer API to make the commit return a future we felt this could benefit from a similar pattern. And if we are considering it for the consumer it would be beneficial to have an identical API in the producer that also helps address the blocking concerns that people have raised. Personally I like the future-oriented API since (i) it makes it clear to the caller that some asynchronous handling is involved and the result can be collected from the future (ii) has clear semantics on blocking - the user can decide (at runtime) whether (or how much) blocking is acceptable. The other approach to go with max.block.ms==0 and throw an exception would also work but to be equally clear in the API, the API should throw a checked exception (which is again an API change). Yes it involves an API change - but I don't think we want to be completely closed to changes in the API of the new (okay one year old) producer if everyone agrees that a better and clearer API exists. I agree this is a pain and should be weighed carefully. The barrier for change is much less on the new consumer but I would be surprised if majority of users would be opposed to any API changes in the new producer even at this stage as long as it is in the right direction and older APIs are deprecated over time. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This
Re: Inquiry regarding unreviewed patch (KAFKA-1614)
Hi Kim, I took a quick look at the patch. It seems all the information you need are available in Kafka now. There are several mBeans you can take a look. They are in kafka.log. Including log start/end offsets, size and number of log segments. Thanks, Jiangjie (Becket) Qin On 7/13/15, 5:01 PM, Jisoo Kim jisoo@metamarkets.com wrote: Also, please let me know if there's a way for another program to know the amount of data each broker currently holds. Thanks, Jisoo On Mon, Jul 13, 2015 at 4:59 PM, Jisoo Kim jisoo@metamarkets.com wrote: To whom may it concern, My coworker submitted a patch https://issues.apache.org/jira/browse/KAFKA-1614 about a year ago, which enables JMX to report segment information, so the amount of data each broker has can be calculated through JMX polling. May I ask the progress on reviewing the patch? I'd like to add a new feature that does the similar thing to Yahoo's Kafka Manager, and would greatly appreciate if the patch can be applied to the repo, so Yahoo's Kafka Manager can display the information when using Kafka with the version with the patch. Thanks! Regards, Jisoo
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627485#comment-14627485 ] Jiangjie Qin commented on KAFKA-1835: - My example for use case (3) is probably a bad one... However, partitionsFor() might also be called independently but not only for sending data, right? So if user wants a completely non-blocking API, letting partionsFor() to block would not work. I think the main benefit to have partitionsFor() to return a future is it can cover both use case (2) and (3) dynamically, which have opposite blocking requirements on partitonsFor(). Not sure if it is just my imagined use case, but let's say in use case (3), user also wants to pre-initialize the metadata but after that they don't want to block on any call. Then they can do something like this: {code} ... partitionsFor(foo).get(); try { send(); ... // some other logic Future.. partitionInfo = partitionsFor(); // if (partitionInfo.isDone()) { ... // do something with partition info. } ... } catch (TimeoutException te){ // handle exception } {code} So I guess returning a future gives user the max freedom. That said, throwing exception for non-blocking calls and letting user to handle it also sounds reasonable to me. Functionality wise, I don't think there is much difference between the above proposal and having an additional configuration of topic.metadata.preinit.list. The benefit of the configuration approach may be it is a smaller and backward compatible change. Do you see any issue on adding that configuration? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)