[jira] [Created] (KAFKA-6680) Fix config initialization in DynamicBrokerConfig

2018-03-19 Thread Manikumar (JIRA)
Manikumar created KAFKA-6680:


 Summary: Fix config initialization in DynamicBrokerConfig
 Key: KAFKA-6680
 URL: https://issues.apache.org/jira/browse/KAFKA-6680
 Project: Kafka
  Issue Type: Bug
Reporter: Manikumar


Below issues observed while testing dynamic config update feature

1. {{kafkaConfig}} doesn't get updated during {{initialize}} if there are no 
dynamic configs defined in ZK.

2.  update DynamicListenerConfig.validateReconfiguration() to check new 
Listeners must be subset of listener map



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-248 Create New ConfigCommand That Uses The New AdminClient

2018-03-19 Thread Viktor Somogyi
Hi,

Since there were no additional on this KIP, I'd like to restart the vote
tomorrow.
If anyone has comments, please do address them.

Thanks,
Viktor

On Mon, Feb 19, 2018 at 9:44 AM, Viktor Somogyi 
wrote:

> Hi Rajini,
>
> Thanks for the feedback, I've applied your points.
>
> Viktor
>
> On Wed, Feb 7, 2018 at 7:22 PM, Rajini Sivaram 
> wrote:
>
>> Hi Viktor,
>>
>> Thanks for the updates. Looks good, just a few minor comments:
>>
>>1. AdminOperation - could be AlterOperation since it is only applied to
>>'Alter'?
>>2. Don't think we need `Unknown` type to process old requests. We can
>>use `Set` as the default for alter requests with version 0.
>>3. There is a typo in
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-248+-
>> +Create+New+ConfigCommand+That+Uses+The+New+AdminClient#
>> KIP-248-CreateNewConfigCommandThatUsesTheNewAdminClient-AdminClientAPIs
>> :
>>AdminOperation enum has a constructor QuotaType.
>>
>>
>> On Wed, Feb 7, 2018 at 4:53 PM, Viktor Somogyi 
>> wrote:
>>
>> > Hi Rajini,
>> >
>> > I think it makes sense absolutely and even we could do it for
>> AlterQuotas
>> > as we will have the same problem there.
>> > Updated my KIP
>> > > +Create+New+
>> > ConfigCommand+That+Uses+The+New+AdminClient>
>> > to
>> > reflect these changes:
>> > - proposed protocol changes
>> > - created a AdminOperation type to represent the Add/Set/Delete triplet.
>> > (Put in the org.apache.kafka.clients.admin package)
>> >
>> > Please let me know if I missed something that you thought otherwise.
>> >
>> > Regards,
>> > Viktor
>> >
>> >
>> > On Tue, Feb 6, 2018 at 1:31 PM, Rajini Sivaram > >
>> > wrote:
>> >
>> > > Hi Viktor,
>> > >
>> > > While implementing KAFKA-6494, I realised that there is a mismatch
>> > between
>> > > the --alter command of ConfigCommand and AlterConfigs request.
>> > > ConfigCommand uses --add-config and --delete-config to make
>> incremental
>> > > updates. --add-config reads all the configs from ZooKeeper and adds
>> the
>> > > delta provided on top of that. AlterConfigs request currently sets the
>> > > whole properties object, so you need to know the full set of
>> properties
>> > of
>> > > an entity to use AlterConfigs request through the AdminClient. We
>> don't
>> > > allow sensitive configs to be read using AdminClient, so we can't read
>> > and
>> > > add configs as we do with ZooKeeper. So we need a protocol change to
>> make
>> > > this work. I didn't want to make this change after KIP freeze, so
>> perhaps
>> > > we could include this in your KIP? We could perhaps add a mode
>> > > (SET/ADD/DELETE) for AlterConfigs request where SET matches the
>> existing
>> > > behaviour for backward compatibility and ConfigCommand uses
>> ADD/DELETE.
>> > >
>> > > Thoughts?
>> > >
>> > > Regards,
>> > >
>> > > Rajini
>> > >
>> > > On Fri, Jan 19, 2018 at 12:57 PM, Viktor Somogyi <
>> > viktorsomo...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Rajini,
>> > > >
>> > > > Ok, I think I got you. I wasn't calculating with the fact that the
>> > parent
>> > > > might not be set, therefore it could be a default user as well or
>> even
>> > > the
>> > > > default client if nothing else is set (supposing we're talking about
>> > the
>> > > >  example). So if I'm correct, the quota will be
>> applied
>> > in
>> > > > the order of the above points. In this case your argument is
>> absolutely
>> > > > valid. I'll modify the QuotaSource.
>> > > >
>> > > > About your last point: yes, I was hesitating a lot. I thought the
>> > > interface
>> > > > would be simpler but after removing the helpers it's not that scary
>> > > > afterall :).
>> > > > I'll start the vote.
>> > > >
>> > > > Viktor
>> > > >
>> > > >
>> > > > On Thu, Jan 18, 2018 at 7:59 PM, Rajini Sivaram <
>> > rajinisiva...@gmail.com
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi Viktor,
>> > > > >
>> > > > > Thanks for the updates.
>> > > > >
>> > > > > *QuotaSource* currently has *Self/Default/Parent*. Not sure if
>> that
>> > is
>> > > > > sufficient.
>> > > > > For the entity , quota could be used from any of
>> > these
>> > > > > configs:
>> > > > >
>> > > > >1. /config/users//clients/
>> > > > >2. /config/users//clients/
>> > > > >3. /config/users/
>> > > > >4. /config/users//clients/
>> > > > >5. /config/users//clients/
>> > > > >6. /config/users/
>> > > > >7. /config/clients/
>> > > > >8. /config/clients/
>> > > > >
>> > > > > So perhaps we should have a *QuotaSource* entry for each of these
>> > > eight?
>> > > > >
>> > > > > A couple of minor points:
>> > > > >
>> > > > >- *Help Message* still uses --config.properties
>> > > > >- The other AdminClient APIs don't use aliases for various
>> > > > collections.
>> > > > >So not sure if we need the aliases here. I 

Build failed in Jenkins: kafka-trunk-jdk8 #2489

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[mjsax] KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)

--
[...truncated 3.55 MB...]

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfBootstrapServersIsNotSet STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfBootstrapServersIsNotSet PASSED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustUseAdminClientConfigForRetries STARTED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustUseAdminClientConfigForRetries PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfApplicationIdIsNotSet STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfApplicationIdIsNotSet PASSED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustContainStreamPartitionAssignorConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
consumerConfigMustContainStreamPartitionAssignorConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled
 STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled
 PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectKeySerdeClassOnError STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyCorrectKeySerdeClassOnError PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenBetweenBeginningAndEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenBetweenBeginningAndEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenAfterEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenAfterEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
shouldAcceptValidDateFormats STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
shouldAcceptValidDateFormats PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > shouldDeleteTopic STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > shouldDeleteTopic PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenAfterEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenAfterEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenBeforeBeginningOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetToSpecificOffsetWhenBeforeBeginningOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > shouldSeekToEndOffset 
STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > shouldSeekToEndOffset 
PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenBetweenBeginningAndEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenBetweenBeginningAndEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenBeforeBeginningOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenBeforeBeginningOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
shouldThrowOnInvalidDateFormat STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
shouldThrowOnInvalidDateFormat PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenAfterEndOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testResetUsingPlanWhenAfterEndOffset PASSED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenBeforeBeginningOffset STARTED

org.apache.kafka.streams.tools.StreamsResetterTest > 
testShiftOffsetByWhenBeforeBeginningOffset PASSED

org.apache.kafka.streams.StreamsBuilderTest > testMerge STARTED

org.apache.kafka.streams.StreamsBuilderTest > testMerge PASSED


Re: [DISCUSS] KIP-240: AdminClient.listReassignments AdminClient.describeReassignments

2018-03-19 Thread Tom Bentley
Last week I was able to spend a bit of time working on KIP-236 again and,
based on the discussion about that with Jun back in December, I refactored
the controller to store the reassignment state in /brokers/topics/${topic}
instead of introducing new ZK nodes. This morning I was wondering what to
do as a next step, as these changes are more or less useless on their own,
without APIs for discovering the current partitions and/or reassigning
partitions. I started thinking again about this KIP, and realised that
using an internal compacted topic (say __partition_reassignments), as
suggested by Steven and Colin, would require changes in basically the same
places.

Thinking through some of the failure modes ("what if I update ZK, but can't
update produce to the topic?") I realised that it would actually be
possible to simply remove storing this info from ZK entirely and just store
this state in the __partition_reassignments topic. Doing it that way would
eliminate those failure modes and would allow clients interested in
reassignment completion the possibility to consume from this topic and
respond to records published with a null value (indicating completion of a
reassignment).

There are some interesting implications to doing this:

1. This __partition_reassignments topic would need to be replicated in
order to have availability of reassignment (if the leader of a partition of
__partition_reassignments is not available then reassignment of those
partitions whose state is held by the partition
of __partition_reassignments would not be reassignable).
2. We would want to avoid unclean leader election for this topic.

But I am interested in what other people think about this approach?

Cheers,

Tom


On 9 January 2018 at 21:18, Colin McCabe  wrote:

> What if we had an internal topic which watchers could listen to for
> information about partition reassignments?  The information could be in
> JSON, so if we want to add new fields later, we always could.
>
> This avoids introducing a new AdminClient API.  For clients that want to
> be notified about partition reassignments in a timely fashion, this avoids
> the "polling an AdminClient API in a tight loop" antipattern.  It allows
> watchers to be notified in a simple and natural way about what is going
> on.  Access can be controlled by the existing topic ACL mechanisms.
>
> best,
> Colin
>
>
> On Fri, Dec 22, 2017, at 06:48, Tom Bentley wrote:
> > Hi Steven,
> >
> > I must admit that I didn't really considered that option. I can see how
> > attractive it is from your perspective. In practice it would come with
> lots
> > of edge cases which would need to be thought through:
> >
> > 1. What happens if the controller can't produce a record to this topic
> > because the partitions leader is unavailable?
> > 2. One solution to that is for the topic to be replicated on every
> broker,
> > so that the controller could elect itself leader on controller failover.
> > But that raises another problem: What if, upon controller failover, the
> > controller is ineligible for leader election because it's not in the ISR?
> > 3. The above questions suggest the controller might not always be able to
> > produce to the topic, but the controller isn't able to control when other
> > brokers catch up replicating moved partitions and has to deal with those
> > events. The controller would have to record (in memory) that the
> > reassignment was complete, but hadn't been published, and publish later,
> > when it was able to.
> > 4. Further to 3, we would need to recover the in-memory state of
> > reassignments on controller failover. But now we have to consider what
> > happens if the controller cannot *consume* from the topic.
> >
> > This seems pretty complicated to me. I think each of the above points has
> > alternatives (or compromises) which might make the problem more
> tractable,
> > so I'd welcome hearing from anyone who has ideas on that. In particular
> > there are parallels with consumer offsets which might be worth thinking
> > about some more.
> >
> > I would be useful it define better the use case we're trying to cater to
> > here.
> >
> > * Is it just a notification that a given reassignment has finished that
> > you're interested in?
> > * What are the consequences if such a notification is delayed, or dropped
> > entirely?
> >
> > Regards,
> >
> > Tom
> >
> >
> >
> > On 19 December 2017 at 20:34, Steven Aerts 
> wrote:
> >
> > > Hello Tom,
> > >
> > >
> > > when you were working out KIP-236, did you consider migrating the
> > > reassignment
> > > state from zookeeper to an internal kafka topic, keyed by partition
> > > and log compacted?
> > >
> > > It would allow an admin client and controller to easily subscribe for
> > > those changes,
> > > without the need to extend the network protocol as discussed in
> KIP-240.
> > >
> > > This is just a theoretical idea I wanted to share, as I can't find a
> > > reason why it would
> 

[jira] [Created] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-03-19 Thread Chema Sanchez (JIRA)
Chema Sanchez created KAFKA-6683:


 Summary: ReplicaFetcher crashes with "Attempted to complete a 
transaction which was not started" 
 Key: KAFKA-6683
 URL: https://issues.apache.org/jira/browse/KAFKA-6683
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 1.0.0
 Environment: os: GNU/Linux 
arch: x86_64
Kernel: 4.9.77
jvm: OpenJDK 1.8.0
Reporter: Chema Sanchez
 Attachments: server.properties

We have been experiencing this issue lately when restarting or replacing 
brokers of our Kafka clusters during maintenance operations.

After restarting or replacing a broker, after some minutes performing normally 
it may suddenly throw the following exception and stop replicating some 
partitions:

{code:none}
2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
java.lang.IllegalArgumentException: Attempted to complete a transaction which 
was not started
    at 
kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
    at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
    at kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at 
scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
    at 
scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
    at kafka.log.Log.loadProducersFromLog(Log.scala:540)
    at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
    at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
    at scala.collection.Iterator.foreach(Iterator.scala:929)
    at scala.collection.Iterator.foreach$(Iterator.scala:929)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.log.Log.loadProducerState(Log.scala:514)
    at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
    at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
    at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
    at kafka.log.Log.truncateTo(Log.scala:1467)
    at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
    at 
kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
    at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
    at kafka.log.LogManager.truncateTo(LogManager.scala:445)
    at 
kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
    at scala.collection.Iterator.foreach(Iterator.scala:929)
    at scala.collection.Iterator.foreach$(Iterator.scala:929)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at 
kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
    at 
kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
    at 
kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
    at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, 
fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
{code}

As during system updates, all brokers in a cluster are restarted, it happened 
some times the issue to manifest in different brokers holding replicas for the 
same partition at the same time, which caused downtime due partitions go 
offline.

It is necessary to restart the faulted broker in order to recover partition 
replication, but after hitting this issue we often face that after restarting 
the broker it shuts itself down whith the following error among lots of 
warnings due corrupted indices:

{code:none}
[2018-03-05 16:02:22,450] ERROR There was an error in one of the threads during 
logs 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Sean Glover
Type names: I vote for option 2.  The user must explicitly add a dependency
to this library and the wrapper types are in a different package.  It seems
reasonable to expect them to do an import rename if there's a need to drop
down to the Java API.

Test Utils: The test utils in kafka-streams-scala are nice and lean, but
I'm not sure if it provides much more value than other options that exist
in the community.  There's an embedded Kafka/ZK project implementation for
ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.  It
implies you must also use ScalaTest, which I acknowledge isn't everyone's
first choice for Scala test framework, but it probably is one of, if not
the most, popular library.  It includes a DSL for Kafka Streams too.  If
this KIP is accepted then perhaps a PR to that project could be made to
support the new wrapper implementations.

https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

Sean

On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
debasish.gh...@lightbend.com> wrote:

> >
> > > Should this be 1.2  (maybe it's even better to not put any version at
> > all)
>
>
> Actually wanted to emphasize that the support is from 1.0.0 onwards ..
> Should we make that explicit ? Like ..
>
> kafka-streams-scala only depends on the Scala standard library and Kafka
> > Streams 1.0.0+.
>
>
>  In 1.1 release, we add a new module `kafka-streams-test-utils` to simplify
> > testing for Kafka Streams applications. Are those test utils suitable for
> > Scala users or should we add Scala wrappers for those, too?
>
>
> I will check up and let you know ..
>
> Also I am not clear about the decision on renaming of Scala abstractions.
> Can we have a consensus on this ? Here's the summary ..
>
> *Option 1:* Keep names separate (KStream for Java class, KStreamS for
> Scala). No renaming of imports required.
> *Option 2:* Unify names (KStream for Java and Scala class names). No
> conflict since they will reside in different packages. But if we need to
> use both abstractions, renaming of imports are required. But again, this
> may not be a too frequent use case.
>
> Suggestions ?
>
> regards.
>
> On Sat, Mar 17, 2018 at 3:07 AM, Matthias J. Sax 
> wrote:
>
> > Thanks a lot for the KIP! Two questions:
> >
> > 1) the KIP states:
> >
> > > kafka-streams-scala only depends on the Scala standard library and
> Kafka
> > Streams 1.0.0.
> >
> > Should this be 1.2  (maybe it's even better to not put any version at
> all)
> >
> >
> > 2) In 1.1 release, we add a new module `kafka-streams-test-utils` to
> > simplify testing for Kafka Streams applications. Are those test utils
> > suitable for Scala users or should we add Scala wrappers for those, too?
> >
> >
> > -Matthias
> >
> >
> > On 3/16/18 11:10 AM, Ted Yu wrote:
> > > Import renames seem to be fine.
> > >
> > > The class names with trailing 'S' look clean.
> > >
> > > Cheers
> > >
> > > On Fri, Mar 16, 2018 at 11:04 AM, Ismael Juma 
> wrote:
> > >
> > >> If this is rare (as it sounds), relying on import renames seems fine
> to
> > me.
> > >> Let's see what others think.
> > >>
> > >> Ismael
> > >>
> > >> On Fri, Mar 16, 2018 at 10:51 AM, Debasish Ghosh <
> > >> debasish.gh...@lightbend.com> wrote:
> > >>
> > >>> I am not sure if this is practical or not. But theoretically a user
> may
> > >>> want to extract the unsafe Java abstraction from the Scala ones and
> use
> > >>> Java APIs on them .. e.g.
> > >>>
> > >>> val userClicksStream: KStreamS[String, Long] =
> > >>> builder.stream(userClicksTopic) // Scala abstraction
> > >>>
> > >>> val jStream: KStream[String, Long] = userClicksStream.inner //
> > publishes
> > >>> the underlying Java abstraction
> > >>>
> > >>> //.. work with Java, may be pass to some function written in Java
> > >>>
> > >>> I do realize this is somewhat of a convoluted use case and may not be
> > >>> practically useful ..
> > >>>
> > >>> Otherwise we can very well work on the suggested approach of unifying
> > the
> > >>> names ..
> > >>>
> > >>> regards.
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Mar 16, 2018 at 10:28 PM, Ismael Juma 
> > wrote:
> > >>>
> >  What does "mixed mode application" mean? What are the cases where a
> > >> user
> >  would want to use both APIs? I think that would help understand the
> >  reasoning.
> > 
> >  Thanks,
> >  Ismael
> > 
> >  On Fri, Mar 16, 2018 at 8:48 AM, Debasish Ghosh <
> >  debasish.gh...@lightbend.com> wrote:
> > 
> > > Hi Damian -
> > >
> > > We could. But in case the user wants to use both Scala and Java
> APIs
> > >>> (may
> > > be for some mixed mode application), won't that be confusing ? She
> > >> will
> > > have to do something like ..
> > >
> > > import o.a.k.s.scala.{KStream => KStreamS}
> > >
> > > to rename Scala imports or the other way round for imported Java
> > >>> classes.
> > >
> > > regards.
> > 

[jira] [Created] (KAFKA-6682) Kafka reconnection after broker restart

2018-03-19 Thread JIRA
Tomasz Gąska created KAFKA-6682:
---

 Summary: Kafka reconnection after broker restart
 Key: KAFKA-6682
 URL: https://issues.apache.org/jira/browse/KAFKA-6682
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
Reporter: Tomasz Gąska


I am using kafka producer plugin for logback (danielwegener) with the clients 
library 1.0.0 and after restart of broker all my JVMs connected to it get tons 
of the exceptions:
{code:java}
11:22:48.738 [kafka-producer-network-thread | app-logback-relaxed] cid: 
clid: E [    @] a: o.a.k.c.p.internals.Sender - [Producer 
clientId=id-id-logback-relaxed] Uncaught error in kafka producer I/O 
thread:  ex:java.lang.NullPointerException: null
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:399)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:798){code}
During restart there are still other brokers available behind LB.    

Dosen't matter kafka is up again, only restarting JVM helps
{code:java}
    
    
    
   
 %date{"-MM-dd'T'HH:mm:ss.SSS'Z'"} ${HOSTNAME} 
[%thread] %logger{32} - %message ex:%exf%n
    
    mytopichere
    
    
    
    
    
   
 
    
    bootstrap.servers=10.99.99.1:9092
    
    acks=0
    
    block.on.buffer.full=false
    
    
client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed
    
    
    compression.type=none
   
 
    max.block.ms=0
    {code}

I provide loadbalancer address in bootstrap servers here. There are three kafka 
brokers behind.
{code:java}
java version "1.7.0"
Java(TM) SE Runtime Environment (build pap6470sr9fp60ifix-20161110_01(SR9 
FP60)+IV90630+IV90578))
IBM J9 VM (build 2.6, JRE 1.7.0 AIX ppc64-64 Compressed References 
20161005_321282 (JIT enabled, AOT enabled)
J9VM - R26_Java726_SR9_20161005_1259_B321282
JIT  - tr.r11_20161001_125404
GC   - R26_Java726_SR9_20161005_1259_B321282_CMPRSS
J9CL - 20161005_321282)
JCL - 20161021_01 based on Oracle jdk7u121-b15{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-19 Thread Amit Sela (JIRA)
Amit Sela created KAFKA-6684:


 Summary: Support casting values with bytes schema to string 
 Key: KAFKA-6684
 URL: https://issues.apache.org/jira/browse/KAFKA-6684
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Amit Sela


Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6685) Connect deserialization log message should distinguish key from value

2018-03-19 Thread Yeva Byzek (JIRA)
Yeva Byzek created KAFKA-6685:
-

 Summary: Connect deserialization log message should distinguish 
key from value
 Key: KAFKA-6685
 URL: https://issues.apache.org/jira/browse/KAFKA-6685
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yeva Byzek


Connect was configured for Avro key and value but data had String key and Avro 
value. The resulting error message was misleading because it didn't distinguish 
key from value, and so I was chasing problems with the value instead of the key.

tl;dr Connect should at least tell you whether the problem is with 
deserializing the key or value of a record

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk7 #3271

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-6676: Ensure Kafka chroot exists in system tests and use 
chroot on

--
[...truncated 1.53 MB...]
org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest
 PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets 
STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest
 STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest
 PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED


Build failed in Jenkins: kafka-1.1-jdk7 #90

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-6676: Ensure Kafka chroot exists in system tests and use 
chroot on

--
[...truncated 1.68 MB...]
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] PASSED
ERROR: Could not install GRADLE_3_5_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at 

Jenkins build is back to normal : kafka-trunk-jdk9 #495

2018-03-19 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-6689) Kafka not release .deleted file.

2018-03-19 Thread A (JIRA)
A created KAFKA-6689:


 Summary: Kafka not release .deleted file.
 Key: KAFKA-6689
 URL: https://issues.apache.org/jira/browse/KAFKA-6689
 Project: Kafka
  Issue Type: Bug
  Components: config, controller, log
Affects Versions: 0.10.1.1
 Environment: 3 Kafka Broker running on CentOS6.9(rungi3 VMs) 
Reporter: A
 Fix For: 0.10.1.1


         After Kafka cleaned log  .timeindex / .index files based on topic 
retention. I can
still lsof a lot of .index.deleted and .timeindex.deleted files. 

       We have 3 Brokers on 3 VMs ,  It's happen only 2 brokers.  

[brokeer-01 ~]$ lsof -p 24324 | grep deleted | wc -l
28

[broker-02 ~]$ lsof -p 12131 | grep deleted | wc -l
14599

[broker-03 ~]$ lsof -p 3349 | grep deleted | wc -l
14523

 

Configuration on 3 broker is same.  (Rolllog every hour, Retention time 11 
Hours)
 * INFO KafkaConfig values:
 advertised.host.name = null
 advertised.listeners = PLAINTEXT://Broker-02:9092
 advertised.port = null
 authorizer.class.name =
 auto.create.topics.enable = true
 auto.leader.rebalance.enable = true
 background.threads = 10
 broker.id = 2
 broker.id.generation.enable = true
 broker.rack = null
 compression.type = producer
 connections.max.idle.ms = 60
 controlled.shutdown.enable = true
 controlled.shutdown.max.retries = 3
 controlled.shutdown.retry.backoff.ms = 5000
 controller.socket.timeout.ms = 3
 default.replication.factor = 3
 delete.topic.enable = true
 fetch.purgatory.purge.interval.requests = 1000
 group.max.session.timeout.ms = 30
 group.min.session.timeout.ms = 6000
 host.name =
 inter.broker.protocol.version = 0.10.1-IV2
 leader.imbalance.check.interval.seconds = 300
 leader.imbalance.per.broker.percentage = 10
 listeners = null
 log.cleaner.backoff.ms = 15000
 log.cleaner.dedupe.buffer.size = 134217728
 log.cleaner.delete.retention.ms = 8640
 log.cleaner.enable = true
 log.cleaner.io.buffer.load.factor = 0.9
 log.cleaner.io.buffer.size = 524288
 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
 log.cleaner.min.cleanable.ratio = 0.5
 log.cleaner.min.compaction.lag.ms = 0
 log.cleaner.threads = 1
 log.cleanup.policy = [delete]
 log.dir = /tmp/kafka-logs
 log.dirs = /data/appdata/kafka/data
 log.flush.interval.messages = 9223372036854775807
 log.flush.interval.ms = null
 log.flush.offset.checkpoint.interval.ms = 6
 log.flush.scheduler.interval.ms = 9223372036854775807
 log.index.interval.bytes = 4096
 log.index.size.max.bytes = 10485760
 log.message.format.version = 0.10.1-IV2
 log.message.timestamp.difference.max.ms = 9223372036854775807
 log.message.timestamp.type = CreateTime
 log.preallocate = false
 log.retention.bytes = -1
 log.retention.check.interval.ms = 30
 log.retention.hours = 11
 log.retention.minutes = 660
 log.retention.ms = 3960
 log.roll.hours = 1
 log.roll.jitter.hours = 0
 log.roll.jitter.ms = null
 log.roll.ms = null
 log.segment.bytes = 1073741824
 log.segment.delete.delay.ms = 6
 max.connections.per.ip = 2147483647
 max.connections.per.ip.overrides =
 message.max.bytes = 112
 metric.reporters = []
 metrics.num.samples = 2
 metrics.sample.window.ms = 3
 min.insync.replicas = 2
 num.io.threads = 16
 num.network.threads = 16
 num.partitions = 10
 num.recovery.threads.per.data.dir = 3
 num.replica.fetchers = 1
 offset.metadata.max.bytes = 4096
 offsets.commit.required.acks = -1
 offsets.commit.timeout.ms = 5000
 offsets.load.buffer.size = 5242880
 offsets.retention.check.interval.ms = 60
 offsets.retention.minutes = 1440
 offsets.topic.compression.codec = 0
 offsets.topic.num.partitions = 50
 offsets.topic.replication.factor = 3
 offsets.topic.segment.bytes = 104857600
 port = 9092
 principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
 producer.purgatory.purge.interval.requests = 1000
 queued.max.requests = 3
 quota.consumer.default = 9223372036854775807
 quota.producer.default = 9223372036854775807
 quota.window.num = 11
 quota.window.size.seconds = 1
 replica.fetch.backoff.ms = 1000
 replica.fetch.max.bytes = 1048576
 replica.fetch.min.bytes = 1
 replica.fetch.response.max.bytes = 10485760
 replica.fetch.wait.max.ms = 500
 replica.high.watermark.checkpoint.interval.ms = 5000
 replica.lag.time.max.ms = 1
 replica.socket.receive.buffer.bytes = 65536
 replica.socket.timeout.ms = 3
 replication.quota.window.num = 11
 replication.quota.window.size.seconds = 1
 request.timeout.ms = 3
 reserved.broker.max.id = 1000
 sasl.enabled.mechanisms = [GSSAPI]
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 6
 sasl.kerberos.principal.to.local.rules = [DEFAULT]
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.mechanism.inter.broker.protocol = GSSAPI
 security.inter.broker.protocol = PLAINTEXT
 

Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-03-19 Thread Jay Kreps
Optimizing startup seems really valuable but I'm a little confused by this.

There are two different things:
1. Recovery
2. Sanity check

The terminology we're using is a bit mixed here.

Recovery means checksumming the log segments and rebuilding the index on a
hard crash. This only happens on unflushed segments, which is generally
just the last segment. Recovery is essential for the correctness guarantees
of the log and you shouldn't disable it. It only happens on hard crash and
is not a factor in graceful restart. We can likely optimize it but that
would make most sense to do in a data driven fashion off some profiling.

However there is also a ton of disk activity that happens during
initialization (lots of checks on the file size, absolute path, etc). I
think these have crept in over time with people not really realizing this
code is perf sensitive and java hiding a lot of what is and isn't a file
operation. One part of this is the sanityCheck() call for the two indexes.
I don't think this call reads the full index, just the last entry in the
index, right?. There should be no need to read the full index except during
recovery (and then only for the segments being recovered). I think it would
make a ton of sense to optimize this but I don't think that optimization
needs to be configurable as this is just a helpful sanity check to detect
common non-sensical things in the index files, but it isn't part of the
core guarantees, in general you aren't supposed to lose committed data from
disk, and if you do we may be able to fail faster but we fundamentally
can't really help you. Again I think this would make the most sense to do
in a data driven way, if you look at that code I think it is doing crazy
amounts of file operations (e.g. getAbsolutePath, file sizes, etc). I think
it'd make most sense to profile startup with a cold cash on a large log
directory and do the same with an strace to see how many redundant system
calls we do per segment and what is costing us and then cut some of this
out. I suspect we could speed up our startup time quite a lot if we did
that.

For example we have a bunch of calls like this:

require(len % entrySize == 0,

"Index file " + file.getAbsolutePath + " is corrupt, found " +
len +

" bytes which is not positive or not a multiple of 8.")
I'm pretty such file.getAbsolutePath is a system call and I assume that
happens whether or not you fail the in-memory check?

-Jay


On Sun, Feb 25, 2018 at 10:27 PM, Dong Lin  wrote:

> Hi all,
>
> I have created KIP-263: Allow broker to skip sanity check of inactive
> segments on broker startup. See
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 263%3A+Allow+broker+to+skip+sanity+check+of+inactive+
> segments+on+broker+startup
> .
>
> This KIP provides a way to significantly reduce time to rolling bounce a
> Kafka cluster.
>
> Comments are welcome!
>
> Thanks,
> Dong
>


[jira] [Created] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-03-19 Thread Narayan Periwal (JIRA)
Narayan Periwal created KAFKA-6681:
--

 Summary: Two instances of kafka consumer reading the same 
partition within a consumer group
 Key: KAFKA-6681
 URL: https://issues.apache.org/jira/browse/KAFKA-6681
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.2.1
Reporter: Narayan Periwal


We have seen this issue with the Kafka consumer, the new library that got 
introduced in 0.9

With this new client, the group management is done by kafka coordinator, which 
is one of the kafka broker.

We are using Kafka broker 0.10.2.1 and consumer client version is also 0.10.2.1 

The issue that we have faced is that, after rebalancing, some of the partitions 
gets consumed by 2 instances within a consumer group, leading to duplication of 
the entire partition data. They continue to read until the next rebalancing, or 
the restart of those clients. 

It looks like that a particular consumer goes on fetching the data from a 
partition, but the broker is not able to identify this "stale" consumer 
instance. 

During this time, we also see the underreplicated partition metrics spiking. 

We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk7 #3272

2018-03-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Guozhang Wang
Hi Debasish,

You can work on the PR in parallel to the KIP discussion so that people can
start reviewing it. The only restriction is that the PR cannot be merged
until the KIP is accepted.


Guozhang


On Mon, Mar 19, 2018 at 11:09 AM, Debasish Ghosh <
debasish.gh...@lightbend.com> wrote:

> Based on this discussion I have a question ..
>
> For the actual implementation we need to have a PR on
> https://github.com/apache/kafka and the new library will be in the package
> structure org.apache.kafka.streams.scala. My question is, is this PR part
> of the KIP ? Or we work on the PR once the KIP is accepted ..
>
> regards.
>
> On Mon, Mar 19, 2018 at 11:28 PM, Debasish Ghosh <
> debasish.gh...@lightbend.com> wrote:
>
> > Thanks Guozhang for the comments .. we will start working on them ..
> >
> > regards.
> >
> > On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang 
> > wrote:
> >
> >> And one more comment about the type safety:
> >>
> >> 7. I'm in favor of the approach of enforcing type safety at compile
> time,
> >> since with Scala's strong typing system I think it makes more sense to
> get
> >> rid of config-based serde specifications that can cause runtime error in
> >> the Scala wrapper.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
> >> wrote:
> >>
> >> > Hello Debasish,
> >> >
> >> > Thanks for the KIP. Here are some comments:
> >> >
> >> > 1. About the naming: I'd also vote for option 2 and enforce users to
> >> > rename when import since this scenario seems rare to me.
> >> >
> >> > 2. About the dependency: since this KIP can only be merged as early as
> >> > 1.2, it means that for users who wants to use this artifact, say for
> >> > version 1.2, she would need to bring in "kafka-streams" version 1.2.
> In
> >> > addition, if we change the Java API in the future versions we'd also
> >> need
> >> > to update the Scala wrapper at the same version as well, so in other
> >> words
> >> > "kafka-streams-scala" version X have to be with "kafka-streams"
> version
> >> X
> >> > anyways. So I'd suggest we remove the version number in
> >> "kafka-streams-scala
> >> > only depends on the Scala standard library and Kafka Streams" as it
> may
> >> > be confusing to readers.
> >> >
> >> > 3. For the KIP discussion, it need be based on the proposed state of
> >> this
> >> > project in AK. More specifically, for the groupId in maven / gradle,
> it
> >> > need to be "org.apache.kafka"; for the version, it need to be Kafka
> >> release
> >> > versions, e.g. 1.2.0.
> >> >
> >> > 4. In "New or Changed Public Interfaces" section, it does not mention
> >> the
> >> > serde classes like "DefaultSerdes", but I think they should also be
> >> > considered public APIs as users would most likely import these in her
> >> > implementation.
> >> >
> >> > 5. Could you also list the changes you'd propose to made to
> build.gradle
> >> > in kafka for adding this artifact? More details will help readers to
> >> better
> >> > understand your proposal.
> >> >
> >> > 6. I think it'd will be good to have a WordCount example code as part
> of
> >> > this KIP, to illustrate how to code in this Scala wrapper, e.g. in
> >> > o.a.k.scala.examples. But for this class we probably do not need to
> >> have a
> >> > separate artifact for it as we did in kafka-streams-examples.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu  wrote:
> >> >
> >> >> I agree with Sean on name unification.
> >> >>
> >> >> +1 to option 2.
> >> >>
> >> >> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover <
> >> sean.glo...@lightbend.com>
> >> >> wrote:
> >> >>
> >> >> > Type names: I vote for option 2.  The user must explicitly add a
> >> >> dependency
> >> >> > to this library and the wrapper types are in a different package.
> It
> >> >> seems
> >> >> > reasonable to expect them to do an import rename if there's a need
> to
> >> >> drop
> >> >> > down to the Java API.
> >> >> >
> >> >> > Test Utils: The test utils in kafka-streams-scala are nice and
> lean,
> >> but
> >> >> > I'm not sure if it provides much more value than other options that
> >> >> exist
> >> >> > in the community.  There's an embedded Kafka/ZK project
> >> implementation
> >> >> for
> >> >> > ScalaTest that's popular and active: manub/scalatest-embedded-
> kakfa.
> >> >> It
> >> >> > implies you must also use ScalaTest, which I acknowledge isn't
> >> >> everyone's
> >> >> > first choice for Scala test framework, but it probably is one of,
> if
> >> not
> >> >> > the most, popular library.  It includes a DSL for Kafka Streams
> >> too.  If
> >> >> > this KIP is accepted then perhaps a PR to that project could be
> made
> >> to
> >> >> > support the new wrapper implementations.
> >> >> >
> >> >> > https://github.com/manub/scalatest-embedded-kafka#
> >> >> > scalatest-embedded-kafka-streams
> >> >> >
> >> >> > Sean
> >> >> >
> >> >> > On Sun, Mar 18, 2018 at 4:05 AM, 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Debasish Ghosh
Cool .. so the changes that u suggested e.g the WordCount example, package
name changes etc. will be in the PR only and not in the KIP document ..

Or we keep the KIP document updated as well ?

regards.

On Mon, 19 Mar 2018 at 11:46 PM, Guozhang Wang  wrote:

> Hi Debasish,
>
> You can work on the PR in parallel to the KIP discussion so that people can
> start reviewing it. The only restriction is that the PR cannot be merged
> until the KIP is accepted.
>
>
> Guozhang
>
>
> On Mon, Mar 19, 2018 at 11:09 AM, Debasish Ghosh <
> debasish.gh...@lightbend.com> wrote:
>
> > Based on this discussion I have a question ..
> >
> > For the actual implementation we need to have a PR on
> > https://github.com/apache/kafka and the new library will be in the
> package
> > structure org.apache.kafka.streams.scala. My question is, is this PR part
> > of the KIP ? Or we work on the PR once the KIP is accepted ..
> >
> > regards.
> >
> > On Mon, Mar 19, 2018 at 11:28 PM, Debasish Ghosh <
> > debasish.gh...@lightbend.com> wrote:
> >
> > > Thanks Guozhang for the comments .. we will start working on them ..
> > >
> > > regards.
> > >
> > > On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang 
> > > wrote:
> > >
> > >> And one more comment about the type safety:
> > >>
> > >> 7. I'm in favor of the approach of enforcing type safety at compile
> > time,
> > >> since with Scala's strong typing system I think it makes more sense to
> > get
> > >> rid of config-based serde specifications that can cause runtime error
> in
> > >> the Scala wrapper.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
> > >> wrote:
> > >>
> > >> > Hello Debasish,
> > >> >
> > >> > Thanks for the KIP. Here are some comments:
> > >> >
> > >> > 1. About the naming: I'd also vote for option 2 and enforce users to
> > >> > rename when import since this scenario seems rare to me.
> > >> >
> > >> > 2. About the dependency: since this KIP can only be merged as early
> as
> > >> > 1.2, it means that for users who wants to use this artifact, say for
> > >> > version 1.2, she would need to bring in "kafka-streams" version 1.2.
> > In
> > >> > addition, if we change the Java API in the future versions we'd also
> > >> need
> > >> > to update the Scala wrapper at the same version as well, so in other
> > >> words
> > >> > "kafka-streams-scala" version X have to be with "kafka-streams"
> > version
> > >> X
> > >> > anyways. So I'd suggest we remove the version number in
> > >> "kafka-streams-scala
> > >> > only depends on the Scala standard library and Kafka Streams" as it
> > may
> > >> > be confusing to readers.
> > >> >
> > >> > 3. For the KIP discussion, it need be based on the proposed state of
> > >> this
> > >> > project in AK. More specifically, for the groupId in maven / gradle,
> > it
> > >> > need to be "org.apache.kafka"; for the version, it need to be Kafka
> > >> release
> > >> > versions, e.g. 1.2.0.
> > >> >
> > >> > 4. In "New or Changed Public Interfaces" section, it does not
> mention
> > >> the
> > >> > serde classes like "DefaultSerdes", but I think they should also be
> > >> > considered public APIs as users would most likely import these in
> her
> > >> > implementation.
> > >> >
> > >> > 5. Could you also list the changes you'd propose to made to
> > build.gradle
> > >> > in kafka for adding this artifact? More details will help readers to
> > >> better
> > >> > understand your proposal.
> > >> >
> > >> > 6. I think it'd will be good to have a WordCount example code as
> part
> > of
> > >> > this KIP, to illustrate how to code in this Scala wrapper, e.g. in
> > >> > o.a.k.scala.examples. But for this class we probably do not need to
> > >> have a
> > >> > separate artifact for it as we did in kafka-streams-examples.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu 
> wrote:
> > >> >
> > >> >> I agree with Sean on name unification.
> > >> >>
> > >> >> +1 to option 2.
> > >> >>
> > >> >> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover <
> > >> sean.glo...@lightbend.com>
> > >> >> wrote:
> > >> >>
> > >> >> > Type names: I vote for option 2.  The user must explicitly add a
> > >> >> dependency
> > >> >> > to this library and the wrapper types are in a different package.
> > It
> > >> >> seems
> > >> >> > reasonable to expect them to do an import rename if there's a
> need
> > to
> > >> >> drop
> > >> >> > down to the Java API.
> > >> >> >
> > >> >> > Test Utils: The test utils in kafka-streams-scala are nice and
> > lean,
> > >> but
> > >> >> > I'm not sure if it provides much more value than other options
> that
> > >> >> exist
> > >> >> > in the community.  There's an embedded Kafka/ZK project
> > >> implementation
> > >> >> for
> > >> >> > ScalaTest that's popular and active: manub/scalatest-embedded-
> > kakfa.
> > >> >> It
> > >> >> > implies you must also use 

[jira] [Created] (KAFKA-6687) Allow to read a topic multiple times

2018-03-19 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6687:
--

 Summary: Allow to read a topic multiple times
 Key: KAFKA-6687
 URL: https://issues.apache.org/jira/browse/KAFKA-6687
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Users often want to read topic multiple times. However, this is not possible 
because there is a single consumer and thus a topic can only be consumed once.

Users get an exception
{quote}Exception in thread “main” 
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
source has already been registered by another source.
{quote}
If they use a topic name in multiple `stream()`, `table()`, `globalTable()` 
calls.

However, with KAFKA-6034 in place, we could allow adding a topic multiple times 
and rewrite the topology internally to only read the topic once. This would 
simplify application code as users don't need to put workaround in place to get 
the same behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Guozhang Wang
Hi Richard,

I made a pass over the KIP again, some more clarifications / comments:

1. seek() call itself is not blocking, only the following poll() call may
be blocking as the actually metadata rq will happen.

2. I saw you did not include Consumer.partitionFor(),
Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. After
a second thought, I think this may be a better idea to not tackle them in
the same KIP, and probably we should consider whether we would change the
behavior or not in another discussion. So I agree to not include them.

3. In your wiki you mentioned "Another change shall be made to
KafkaConsumer#poll(), due to its call to updateFetchPositions() which
blocks indefinitely." This part may a bit obscure to most readers who's not
familiar with the KafkaConsumer internals, could you please add more
elaborations. More specifically, I think the root causes of the public APIs
mentioned are a bit different while the KIP's explanation sounds like they
are due to the same reason:

3.1 fetchCommittedOffsets(): this internal call will block forever if the
committed offsets cannot be fetched successfully and affect position() and
committed(). We need to break out of its internal while loop.
3.2 position() itself will while loop when offsets cannot be retrieved in
the underlying async call. We need to break out this while loop.
3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should take
the user specified timeouts when applicable.



Guozhang

On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu 
wrote:

> Actually, what I said above is inaccurate. In
> testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, not
> seek.
> My assumption is that seek did not update correctly. I will be digging
> further into this.
>
>
>
> On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu 
> wrote:
>
> > One more thing: when looking through tests, I have realized that seek()
> > methods can potentially block indefinitely. As you well know, seek() is
> > called when pollOnce() or position() is active. Thus, if position()
> blocks
> > indefinitely, then so would seek(). Should bounding seek() also be
> included
> > in this KIP?
> >
> > Thanks, Richard
> >
> > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu 
> > wrote:
> >
> >> Thanks for the advice, Jason
> >>
> >> I have modified KIP-266 to include the java doc for committed() and
> other
> >> blocking methods, and I also
> >> mentioned poll() which will also be bounded. Let me know if there is
> >> anything else. :)
> >>
> >> Sincerely, Richard
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson 
> >> wrote:
> >>
> >>> Hi Richard,
> >>>
> >>> Thanks for the updates. I'm really glad you picked this up. A couple
> >>> minor
> >>> comments:
> >>>
> >>> 1. Can you list the full set of new APIs explicitly in the KIP?
> >>> Currently I
> >>> only see the javadoc for `position()`.
> >>>
> >>> 2. We should consider adding `TimeUnit` to the new methods to avoid
> unit
> >>> confusion. I know it's inconsistent with the poll() API, but I think it
> >>> was
> >>> probably a mistake not to include it there, so better not to double
> down
> >>> on
> >>> that mistake. And note that we do already have `close(long, TimeUnit)`.
> >>>
> >>> Other than that, I think the current KIP seems reasonable.
> >>>
> >>> Thanks,
> >>> Jason
> >>>
> >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu <
> yohan.richard...@gmail.com>
> >>> wrote:
> >>>
> >>> > Note to all: I have included bounding commitSync() and committed() in
> >>> this
> >>> > KIP.
> >>> >
> >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu <
> >>> yohan.richard...@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > Hi all,
> >>> > >
> >>> > > I updated the KIP where overloading position() is now the favored
> >>> > approach.
> >>> > > Bounding position() using requestTimeoutMs has been listed as
> >>> rejected.
> >>> > >
> >>> > > Any thoughts?
> >>> > >
> >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang 
> >>> > wrote:
> >>> > >
> >>> > >> I agree that adding the overloads is most flexible. But going for
> >>> that
> >>> > >> direction we'd do that for all the blocking call that I've listed
> >>> above,
> >>> > >> with this timeout value covering the end-to-end waiting time.
> >>> > >>
> >>> > >>
> >>> > >> Guozhang
> >>> > >>
> >>> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu 
> >>> wrote:
> >>> > >>
> >>> > >> > bq. The most flexible option is to add overloads to the consumer
> >>> > >> >
> >>> > >> > This option is flexible.
> >>> > >> >
> >>> > >> > Looking at the tail of SPARK-18057, Spark dev voiced the same
> >>> choice.
> >>> > >> >
> >>> > >> > +1 for adding overload with timeout parameter.
> >>> > >> >
> >>> > >> > Cheers
> >>> > >> >
> >>> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson <
> >>> ja...@confluent.io>
> >>> > >> 

Build failed in Jenkins: kafka-trunk-jdk8 #2490

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] KAFKA-6676: Ensure Kafka chroot exists in system tests and use 
chroot on

[rajinisivaram] MINOR: Some logging improvements for debugging delayed produce 
status

--
[...truncated 3.48 MB...]

kafka.controller.ReplicaStateMachineTest > 
testInvalidNewReplicaToReplicaDeletionIneligibleTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:180)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:150)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:160)
at kafka.utils.TestUtils$.tempDir(TestUtils.scala:88)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:246)
at 
kafka.controller.ReplicaStateMachineTest.(ReplicaStateMachineTest.scala:44)

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOfflineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOfflineReplicaTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:180)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:150)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:160)
at kafka.utils.TestUtils$.tempDir(TestUtils.scala:88)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:246)
at 
kafka.controller.ReplicaStateMachineTest.(ReplicaStateMachineTest.scala:44)

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionSuccessfulTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionSuccessfulTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:180)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:150)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:160)
at kafka.utils.TestUtils$.tempDir(TestUtils.scala:88)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:246)
at 
kafka.controller.ReplicaStateMachineTest.(ReplicaStateMachineTest.scala:44)

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOnlineReplicaTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:180)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:150)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:160)
at kafka.utils.TestUtils$.tempDir(TestUtils.scala:88)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:246)
at 
kafka.controller.ReplicaStateMachineTest.(ReplicaStateMachineTest.scala:44)

kafka.controller.ReplicaStateMachineTest > 
testInvalidOnlineReplicaToNewReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidOnlineReplicaToNewReplicaTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:180)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:150)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:160)
at kafka.utils.TestUtils$.tempDir(TestUtils.scala:88)
at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:246)
at 
kafka.controller.ReplicaStateMachineTest.(ReplicaStateMachineTest.scala:44)

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionIneligibleTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionIneligibleTransition FAILED
java.lang.IllegalStateException: Shutdown in progress
at 

[jira] [Created] (KAFKA-6686) Allow reading topic as KStream and GlobalKTable

2018-03-19 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6686:
--

 Summary: Allow reading topic as KStream and GlobalKTable
 Key: KAFKA-6686
 URL: https://issues.apache.org/jira/browse/KAFKA-6686
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Users often want to read topic multiple times, eg, as a stream and a table. For 
KSteam/KTable this is possible by reading the data as a KTable, disable caching 
on the KTable, and get a stream via {{toStream()}}. This pattern does not work 
for KStream/GlobalKTable though.

However, there is no need to raise an exception
{quote}Exception in thread “main” 
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
source has already been registered by another source.
{quote}
for this case, because the GlobalKTable is consumer by a different consumer (in 
contrast to KStream/KTable case for which KStream and KTable use a shared 
consumer).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Guozhang Wang
Hi Matthias,

About 2: yeah I guess this is a subjective preference. My main concern
about keeping the config / handling code beyond 1.2 release is that it will
become a non-cleanable tech debt forever, as fewer and fewer users would
need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
maintain this for nearly no one. On the other hand, I agree that this tech
debt is not too large. So if more people feel this is a good tradeoff to
pay for not enforcing users from older versions to upgrade twice I'm happen
to change my opinion.

A few more minor comments:

4. For the values of "upgrade.from", could we simply to only major.minor?
I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
behavior in bug fix releases we would not need to specify a bug-fix version
to distinguish ever.

5. Could you also present the encoding format in subscription / assignment
metadata bytes in version 2, and in future versions (i.e. which first bytes
would be kept moving forward), for readers to better understand the
proposal? some snippet like ProduceRequest / ProduceRequest in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
would be very helpful.



Guozhang


On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
wrote:

> Thanks for your comments.
>
> 1. Because the old leader cannot decode the new Subscription it can only
> send an empty assignment back. The idea to send empty assignments to all
> members is interesting. I will try this out in an PR to see how it behaves.
>
> 2. I don't see an issue with keeping config `upgrade.from` for future
> releases. Personally, I would prefer to not force users to do two
> upgrades if they want to go from pre-1.2 to post-1.2 version. Is there a
> technical argument why you want to get rid of the config? What
> disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
>
> Keeping the config is just a few lines of code in `StreamsConfig` as
> well we a single `if` statement in `StreamsPartitionAssignor` to force a
> downgrade (cf
> https://github.com/apache/kafka/pull/4636/files#diff-
> 392371c29384e33bb09ed342e7696c68R201)
>
>
> 3. I updated the KIP accordingly.
>
>
> -Matthias
>
> On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > Hello Matthias, thanks for the KIP. Here are some comments:
> >
> > 1. "For all other instances the leader sends a regular Assignment in
> > version X back." Does that mean the leader will exclude any member of the
> > group whose protocol version that it does not understand? For example, if
> > we have A, B, C with A the leader, and B bounced with the newer version.
> In
> > the first rebalance, A will only consider {A, C} for assignment while
> > sending empty assignment to B. And then later when B downgrades will it
> > re-assign the tasks to it again? I felt this is unnecessarily increasing
> > the num. rebalances and the total latency. Could the leader just sends
> > empty assignment to everyone, and since upon receiving the empty
> assignment
> > each thread will not create / restore any tasks and will not clean up its
> > local state (so that the prevCachedTasks are not lost in future
> rebalances)
> > and re-joins immediately, if users choose to bounce an instance once it
> is
> > in RUNNING state the total time of rolling upgrades will be reduced.
> >
> > 2. If we want to allow upgrading from 1.1- versions to any of the future
> > versions beyond 1.2, then we'd always need to keep the special handling
> > logic for this two rolling-bounce mechanism plus a config that we would
> > never be able to deprecate; on the other hand, if the version probing
> > procedure is fast, I think the extra operational cost from upgrading from
> > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then another
> > upgrade from 1.2 to a future version could be small. So depending on the
> > experimental result of the upgrade latency, I'd suggest considering the
> > trade-off of the extra code/config needed maintaining for the special
> > handling.
> >
> > 3. Testing plan: could you elaborate a bit more on the actual
> upgrade-paths
> > we should test? For example, I'm thinking the following:
> >
> > a. 0.10.0 -> 1.2
> > b. 1.1 -> 1.2
> > c. 1.2 -> 1.3 (simulated v4)
> > d. 0.10.0 -> 1.3 (simulated v4)
> > e. 1.1 -> 1.3 (simulated v4)
> >
> > Guozhang
> >
> >
> >
> >
> > On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax  >
> > wrote:
> >
> >> Hi,
> >>
> >> I want to propose KIP-268 to allow rebalance metadata version upgrades
> >> in Kafka Streams:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>
> >> Looking forward to your feedback.
> >>
> >>
> >> -Matthias
> >>
> >>
> >
> >
>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Guozhang Wang
Hello Debasish,

Thanks for the KIP. Here are some comments:

1. About the naming: I'd also vote for option 2 and enforce users to rename
when import since this scenario seems rare to me.

2. About the dependency: since this KIP can only be merged as early as 1.2,
it means that for users who wants to use this artifact, say for version
1.2, she would need to bring in "kafka-streams" version 1.2. In addition,
if we change the Java API in the future versions we'd also need to update
the Scala wrapper at the same version as well, so in other words
"kafka-streams-scala" version X have to be with "kafka-streams" version X
anyways. So I'd suggest we remove the version number in "kafka-streams-scala
only depends on the Scala standard library and Kafka Streams" as it may be
confusing to readers.

3. For the KIP discussion, it need be based on the proposed state of this
project in AK. More specifically, for the groupId in maven / gradle, it
need to be "org.apache.kafka"; for the version, it need to be Kafka release
versions, e.g. 1.2.0.

4. In "New or Changed Public Interfaces" section, it does not mention the
serde classes like "DefaultSerdes", but I think they should also be
considered public APIs as users would most likely import these in her
implementation.

5. Could you also list the changes you'd propose to made to build.gradle in
kafka for adding this artifact? More details will help readers to better
understand your proposal.

6. I think it'd will be good to have a WordCount example code as part of
this KIP, to illustrate how to code in this Scala wrapper, e.g. in
o.a.k.scala.examples. But for this class we probably do not need to have a
separate artifact for it as we did in kafka-streams-examples.


Guozhang


On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu  wrote:

> I agree with Sean on name unification.
>
> +1 to option 2.
>
> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover 
> wrote:
>
> > Type names: I vote for option 2.  The user must explicitly add a
> dependency
> > to this library and the wrapper types are in a different package.  It
> seems
> > reasonable to expect them to do an import rename if there's a need to
> drop
> > down to the Java API.
> >
> > Test Utils: The test utils in kafka-streams-scala are nice and lean, but
> > I'm not sure if it provides much more value than other options that exist
> > in the community.  There's an embedded Kafka/ZK project implementation
> for
> > ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.  It
> > implies you must also use ScalaTest, which I acknowledge isn't everyone's
> > first choice for Scala test framework, but it probably is one of, if not
> > the most, popular library.  It includes a DSL for Kafka Streams too.  If
> > this KIP is accepted then perhaps a PR to that project could be made to
> > support the new wrapper implementations.
> >
> > https://github.com/manub/scalatest-embedded-kafka#
> > scalatest-embedded-kafka-streams
> >
> > Sean
> >
> > On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
> > debasish.gh...@lightbend.com> wrote:
> >
> > > >
> > > > > Should this be 1.2  (maybe it's even better to not put any version
> at
> > > > all)
> > >
> > >
> > > Actually wanted to emphasize that the support is from 1.0.0 onwards ..
> > > Should we make that explicit ? Like ..
> > >
> > > kafka-streams-scala only depends on the Scala standard library and
> Kafka
> > > > Streams 1.0.0+.
> > >
> > >
> > >  In 1.1 release, we add a new module `kafka-streams-test-utils` to
> > simplify
> > > > testing for Kafka Streams applications. Are those test utils suitable
> > for
> > > > Scala users or should we add Scala wrappers for those, too?
> > >
> > >
> > > I will check up and let you know ..
> > >
> > > Also I am not clear about the decision on renaming of Scala
> abstractions.
> > > Can we have a consensus on this ? Here's the summary ..
> > >
> > > *Option 1:* Keep names separate (KStream for Java class, KStreamS for
> > > Scala). No renaming of imports required.
> > > *Option 2:* Unify names (KStream for Java and Scala class names). No
> > > conflict since they will reside in different packages. But if we need
> to
> > > use both abstractions, renaming of imports are required. But again,
> this
> > > may not be a too frequent use case.
> > >
> > > Suggestions ?
> > >
> > > regards.
> > >
> > > On Sat, Mar 17, 2018 at 3:07 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Thanks a lot for the KIP! Two questions:
> > > >
> > > > 1) the KIP states:
> > > >
> > > > > kafka-streams-scala only depends on the Scala standard library and
> > > Kafka
> > > > Streams 1.0.0.
> > > >
> > > > Should this be 1.2  (maybe it's even better to not put any version at
> > > all)
> > > >
> > > >
> > > > 2) In 1.1 release, we add a new module `kafka-streams-test-utils` to
> > > > simplify testing for Kafka Streams applications. Are those test utils
> > > > suitable for Scala users or should we add 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Debasish Ghosh
Thanks Guozhang for the comments .. we will start working on them ..

regards.

On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang  wrote:

> And one more comment about the type safety:
>
> 7. I'm in favor of the approach of enforcing type safety at compile time,
> since with Scala's strong typing system I think it makes more sense to get
> rid of config-based serde specifications that can cause runtime error in
> the Scala wrapper.
>
>
> Guozhang
>
>
> On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
> wrote:
>
> > Hello Debasish,
> >
> > Thanks for the KIP. Here are some comments:
> >
> > 1. About the naming: I'd also vote for option 2 and enforce users to
> > rename when import since this scenario seems rare to me.
> >
> > 2. About the dependency: since this KIP can only be merged as early as
> > 1.2, it means that for users who wants to use this artifact, say for
> > version 1.2, she would need to bring in "kafka-streams" version 1.2. In
> > addition, if we change the Java API in the future versions we'd also need
> > to update the Scala wrapper at the same version as well, so in other
> words
> > "kafka-streams-scala" version X have to be with "kafka-streams" version X
> > anyways. So I'd suggest we remove the version number in
> "kafka-streams-scala
> > only depends on the Scala standard library and Kafka Streams" as it may
> > be confusing to readers.
> >
> > 3. For the KIP discussion, it need be based on the proposed state of this
> > project in AK. More specifically, for the groupId in maven / gradle, it
> > need to be "org.apache.kafka"; for the version, it need to be Kafka
> release
> > versions, e.g. 1.2.0.
> >
> > 4. In "New or Changed Public Interfaces" section, it does not mention the
> > serde classes like "DefaultSerdes", but I think they should also be
> > considered public APIs as users would most likely import these in her
> > implementation.
> >
> > 5. Could you also list the changes you'd propose to made to build.gradle
> > in kafka for adding this artifact? More details will help readers to
> better
> > understand your proposal.
> >
> > 6. I think it'd will be good to have a WordCount example code as part of
> > this KIP, to illustrate how to code in this Scala wrapper, e.g. in
> > o.a.k.scala.examples. But for this class we probably do not need to have
> a
> > separate artifact for it as we did in kafka-streams-examples.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu  wrote:
> >
> >> I agree with Sean on name unification.
> >>
> >> +1 to option 2.
> >>
> >> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover  >
> >> wrote:
> >>
> >> > Type names: I vote for option 2.  The user must explicitly add a
> >> dependency
> >> > to this library and the wrapper types are in a different package.  It
> >> seems
> >> > reasonable to expect them to do an import rename if there's a need to
> >> drop
> >> > down to the Java API.
> >> >
> >> > Test Utils: The test utils in kafka-streams-scala are nice and lean,
> but
> >> > I'm not sure if it provides much more value than other options that
> >> exist
> >> > in the community.  There's an embedded Kafka/ZK project implementation
> >> for
> >> > ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.
> >> It
> >> > implies you must also use ScalaTest, which I acknowledge isn't
> >> everyone's
> >> > first choice for Scala test framework, but it probably is one of, if
> not
> >> > the most, popular library.  It includes a DSL for Kafka Streams too.
> If
> >> > this KIP is accepted then perhaps a PR to that project could be made
> to
> >> > support the new wrapper implementations.
> >> >
> >> > https://github.com/manub/scalatest-embedded-kafka#
> >> > scalatest-embedded-kafka-streams
> >> >
> >> > Sean
> >> >
> >> > On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
> >> > debasish.gh...@lightbend.com> wrote:
> >> >
> >> > > >
> >> > > > > Should this be 1.2  (maybe it's even better to not put any
> >> version at
> >> > > > all)
> >> > >
> >> > >
> >> > > Actually wanted to emphasize that the support is from 1.0.0 onwards
> ..
> >> > > Should we make that explicit ? Like ..
> >> > >
> >> > > kafka-streams-scala only depends on the Scala standard library and
> >> Kafka
> >> > > > Streams 1.0.0+.
> >> > >
> >> > >
> >> > >  In 1.1 release, we add a new module `kafka-streams-test-utils` to
> >> > simplify
> >> > > > testing for Kafka Streams applications. Are those test utils
> >> suitable
> >> > for
> >> > > > Scala users or should we add Scala wrappers for those, too?
> >> > >
> >> > >
> >> > > I will check up and let you know ..
> >> > >
> >> > > Also I am not clear about the decision on renaming of Scala
> >> abstractions.
> >> > > Can we have a consensus on this ? Here's the summary ..
> >> > >
> >> > > *Option 1:* Keep names separate (KStream for Java class, KStreamS
> for
> >> > > Scala). No renaming of imports required.
> >> > > 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Ted Yu
I agree with Sean on name unification.

+1 to option 2.

On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover 
wrote:

> Type names: I vote for option 2.  The user must explicitly add a dependency
> to this library and the wrapper types are in a different package.  It seems
> reasonable to expect them to do an import rename if there's a need to drop
> down to the Java API.
>
> Test Utils: The test utils in kafka-streams-scala are nice and lean, but
> I'm not sure if it provides much more value than other options that exist
> in the community.  There's an embedded Kafka/ZK project implementation for
> ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.  It
> implies you must also use ScalaTest, which I acknowledge isn't everyone's
> first choice for Scala test framework, but it probably is one of, if not
> the most, popular library.  It includes a DSL for Kafka Streams too.  If
> this KIP is accepted then perhaps a PR to that project could be made to
> support the new wrapper implementations.
>
> https://github.com/manub/scalatest-embedded-kafka#
> scalatest-embedded-kafka-streams
>
> Sean
>
> On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
> debasish.gh...@lightbend.com> wrote:
>
> > >
> > > > Should this be 1.2  (maybe it's even better to not put any version at
> > > all)
> >
> >
> > Actually wanted to emphasize that the support is from 1.0.0 onwards ..
> > Should we make that explicit ? Like ..
> >
> > kafka-streams-scala only depends on the Scala standard library and Kafka
> > > Streams 1.0.0+.
> >
> >
> >  In 1.1 release, we add a new module `kafka-streams-test-utils` to
> simplify
> > > testing for Kafka Streams applications. Are those test utils suitable
> for
> > > Scala users or should we add Scala wrappers for those, too?
> >
> >
> > I will check up and let you know ..
> >
> > Also I am not clear about the decision on renaming of Scala abstractions.
> > Can we have a consensus on this ? Here's the summary ..
> >
> > *Option 1:* Keep names separate (KStream for Java class, KStreamS for
> > Scala). No renaming of imports required.
> > *Option 2:* Unify names (KStream for Java and Scala class names). No
> > conflict since they will reside in different packages. But if we need to
> > use both abstractions, renaming of imports are required. But again, this
> > may not be a too frequent use case.
> >
> > Suggestions ?
> >
> > regards.
> >
> > On Sat, Mar 17, 2018 at 3:07 AM, Matthias J. Sax 
> > wrote:
> >
> > > Thanks a lot for the KIP! Two questions:
> > >
> > > 1) the KIP states:
> > >
> > > > kafka-streams-scala only depends on the Scala standard library and
> > Kafka
> > > Streams 1.0.0.
> > >
> > > Should this be 1.2  (maybe it's even better to not put any version at
> > all)
> > >
> > >
> > > 2) In 1.1 release, we add a new module `kafka-streams-test-utils` to
> > > simplify testing for Kafka Streams applications. Are those test utils
> > > suitable for Scala users or should we add Scala wrappers for those,
> too?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 3/16/18 11:10 AM, Ted Yu wrote:
> > > > Import renames seem to be fine.
> > > >
> > > > The class names with trailing 'S' look clean.
> > > >
> > > > Cheers
> > > >
> > > > On Fri, Mar 16, 2018 at 11:04 AM, Ismael Juma 
> > wrote:
> > > >
> > > >> If this is rare (as it sounds), relying on import renames seems fine
> > to
> > > me.
> > > >> Let's see what others think.
> > > >>
> > > >> Ismael
> > > >>
> > > >> On Fri, Mar 16, 2018 at 10:51 AM, Debasish Ghosh <
> > > >> debasish.gh...@lightbend.com> wrote:
> > > >>
> > > >>> I am not sure if this is practical or not. But theoretically a user
> > may
> > > >>> want to extract the unsafe Java abstraction from the Scala ones and
> > use
> > > >>> Java APIs on them .. e.g.
> > > >>>
> > > >>> val userClicksStream: KStreamS[String, Long] =
> > > >>> builder.stream(userClicksTopic) // Scala abstraction
> > > >>>
> > > >>> val jStream: KStream[String, Long] = userClicksStream.inner //
> > > publishes
> > > >>> the underlying Java abstraction
> > > >>>
> > > >>> //.. work with Java, may be pass to some function written in Java
> > > >>>
> > > >>> I do realize this is somewhat of a convoluted use case and may not
> be
> > > >>> practically useful ..
> > > >>>
> > > >>> Otherwise we can very well work on the suggested approach of
> unifying
> > > the
> > > >>> names ..
> > > >>>
> > > >>> regards.
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Mar 16, 2018 at 10:28 PM, Ismael Juma 
> > > wrote:
> > > >>>
> > >  What does "mixed mode application" mean? What are the cases where
> a
> > > >> user
> > >  would want to use both APIs? I think that would help understand
> the
> > >  reasoning.
> > > 
> > >  Thanks,
> > >  Ismael
> > > 
> > >  On Fri, Mar 16, 2018 at 8:48 AM, Debasish Ghosh <
> > >  debasish.gh...@lightbend.com> wrote:
> > > 
> > > > Hi Damian -
> 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Ted Yu
bq. some snippet like ProduceRequest / ProduceRequest

Did you mean ProduceRequest / Response ?

Cheers

On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:

> Hi Matthias,
>
> About 2: yeah I guess this is a subjective preference. My main concern
> about keeping the config / handling code beyond 1.2 release is that it will
> become a non-cleanable tech debt forever, as fewer and fewer users would
> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
> maintain this for nearly no one. On the other hand, I agree that this tech
> debt is not too large. So if more people feel this is a good tradeoff to
> pay for not enforcing users from older versions to upgrade twice I'm happen
> to change my opinion.
>
> A few more minor comments:
>
> 4. For the values of "upgrade.from", could we simply to only major.minor?
> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
> behavior in bug fix releases we would not need to specify a bug-fix version
> to distinguish ever.
>
> 5. Could you also present the encoding format in subscription / assignment
> metadata bytes in version 2, and in future versions (i.e. which first bytes
> would be kept moving forward), for readers to better understand the
> proposal? some snippet like ProduceRequest / ProduceRequest in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> would be very helpful.
>
>
>
> Guozhang
>
>
> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
> wrote:
>
> > Thanks for your comments.
> >
> > 1. Because the old leader cannot decode the new Subscription it can only
> > send an empty assignment back. The idea to send empty assignments to all
> > members is interesting. I will try this out in an PR to see how it
> behaves.
> >
> > 2. I don't see an issue with keeping config `upgrade.from` for future
> > releases. Personally, I would prefer to not force users to do two
> > upgrades if they want to go from pre-1.2 to post-1.2 version. Is there a
> > technical argument why you want to get rid of the config? What
> > disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
> >
> > Keeping the config is just a few lines of code in `StreamsConfig` as
> > well we a single `if` statement in `StreamsPartitionAssignor` to force a
> > downgrade (cf
> > https://github.com/apache/kafka/pull/4636/files#diff-
> > 392371c29384e33bb09ed342e7696c68R201)
> >
> >
> > 3. I updated the KIP accordingly.
> >
> >
> > -Matthias
> >
> > On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > > Hello Matthias, thanks for the KIP. Here are some comments:
> > >
> > > 1. "For all other instances the leader sends a regular Assignment in
> > > version X back." Does that mean the leader will exclude any member of
> the
> > > group whose protocol version that it does not understand? For example,
> if
> > > we have A, B, C with A the leader, and B bounced with the newer
> version.
> > In
> > > the first rebalance, A will only consider {A, C} for assignment while
> > > sending empty assignment to B. And then later when B downgrades will it
> > > re-assign the tasks to it again? I felt this is unnecessarily
> increasing
> > > the num. rebalances and the total latency. Could the leader just sends
> > > empty assignment to everyone, and since upon receiving the empty
> > assignment
> > > each thread will not create / restore any tasks and will not clean up
> its
> > > local state (so that the prevCachedTasks are not lost in future
> > rebalances)
> > > and re-joins immediately, if users choose to bounce an instance once it
> > is
> > > in RUNNING state the total time of rolling upgrades will be reduced.
> > >
> > > 2. If we want to allow upgrading from 1.1- versions to any of the
> future
> > > versions beyond 1.2, then we'd always need to keep the special handling
> > > logic for this two rolling-bounce mechanism plus a config that we would
> > > never be able to deprecate; on the other hand, if the version probing
> > > procedure is fast, I think the extra operational cost from upgrading
> from
> > > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
> another
> > > upgrade from 1.2 to a future version could be small. So depending on
> the
> > > experimental result of the upgrade latency, I'd suggest considering the
> > > trade-off of the extra code/config needed maintaining for the special
> > > handling.
> > >
> > > 3. Testing plan: could you elaborate a bit more on the actual
> > upgrade-paths
> > > we should test? For example, I'm thinking the following:
> > >
> > > a. 0.10.0 -> 1.2
> > > b. 1.1 -> 1.2
> > > c. 1.2 -> 1.3 (simulated v4)
> > > d. 0.10.0 -> 1.3 (simulated v4)
> > > e. 1.1 -> 1.3 (simulated v4)
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I want to propose KIP-268 to allow rebalance metadata 

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

2018-03-19 Thread Vahid S Hashemian
Hi Jason,

Thanks for your feedback and suggestion.

I updated the name "empty_state_timestamp" to "current_state_timestamp" to 
expand its usage to all state changes. If you can think of a better name 
let me know.
And, I fixed the statement on Dead group state to include the coordinator 
change case.

Thanks!
--Vahid



From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   03/17/2018 11:50 AM
Subject:Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Sorry for the delay. I've read through the current KIP and it looks good. 
I
had one minor suggestion: instead of making the timestamp in the group
metadata state message specific to the empty state transition (i.e.
"empty_state_timestamp"), could we leave it generic and let it indicate 
the
time of the current state change (whether it is to Empty, Stable, or
whatever)? Once a group becomes empty, we do not update the state until
deletion anyway, so the timestamp would not change.

Also a minor correction. When a group transitions to Dead, it does not
necessarily indicate offsets have removed. We also use this state when
there is a coordinator change and the group is unloaded from the
coordinator cache.

Thanks,
Jason




On Tue, Mar 6, 2018 at 4:41 PM, Vahid S Hashemian 
 wrote:

> Hi Jason,
>
> Thanks a lot for your clarification and feedback.
> Your statements below all seem reasonable to me.
>
> I have updated the KIP according to the conversation so far.
> It contains significant changes compared to the initial version, so it
> might be worth glancing over the whole thing one more time in case I've
> missed something :)
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson 
> To: dev@kafka.apache.org
> Date:   03/05/2018 03:42 PM
> Subject:Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> On point #1 below: since the expiration timer starts ticking after the
> > group becomes Empty the expire_timestamp of group offsets will be set
> when
> > that transition occurs. In normal cases that expire_timestamp is
> > calculated as "current timestamp" + "broker's offset retention". Then 
if
> > an old client provides a custom retention, we probably need a way to
> store
> > that custom retention (and use it once the group becomes Empty). One
> place
> > to store it is in group metadata message, but the issue is we would be
> > introducing a new field only for backward compatibility (new clients
> don't
> > overwrite the broker's retention), unless we somehow want to support
> this
> > retention on a per-group basis. What do you think?
>
>
> Here's what I was thinking. The current offset commit schema looks like
> this:
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>   ExpireTimestmap => Long
>
> If we have any clients that ask for an explicit retention timeout, then 
we
> can continue using this schema and providing the current behavior. The
> offsets will be retained until they are individually expired.
>
> For newer clients or those that request the default retention, we can 
bump
> the schema and remove ExpireTimestamp.
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>
> We also need to bump the version of the group metadata schema to include
> the timestamp of the state change. There are two cases: standalone
> "simple"
> consumers and consumer groups.
>
> 1) For standalone consumers, we'll expire based on the commit timestamp 
of
> the offset message. Internally, the group will be Empty and have no
> transition timestamp, so the expiration criteria is when (now - commit
> timestamp) is greater than the configured retention time.
>
> 2) For consumer groups, we'll expire based on the timestamp that the 
group
> transitioned to Empty.
>
> This way, changing the retention time config will affect all existing
> groups except those from older clients that are requesting an explicit
> retention time. Would that work?
>
> On point #3: as you mentioned, currently there is no "notification"
> > mechanism for GroupMetadataManager in place when a subscription change
> > occurs. The member subscription however is available in the group
> metadata
> > and a poll approach could be used to check group subscriptions on a
> > regular basis and expire stale offsets (if there are topics the group 
no
> > longer is subscribed to). This can be done as part of the offset 
cleanup
> > scheduled task that by default does not run very frequently. Were you
> > thinking of a different method for capturing the subscription change?
>
>
> Yes, I think that can work.  So we would expire offsets for a consumer
> group individually if they have reached the retention time and the group
> is
> not empty, but is no longer subscribed to them. Is that right?
>
>
> Thanks,
> Jason
>
>
>
>
> On Fri, Mar 2, 2018 at 3:36 PM, 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Guozhang Wang
And one more comment about the type safety:

7. I'm in favor of the approach of enforcing type safety at compile time,
since with Scala's strong typing system I think it makes more sense to get
rid of config-based serde specifications that can cause runtime error in
the Scala wrapper.


Guozhang


On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang  wrote:

> Hello Debasish,
>
> Thanks for the KIP. Here are some comments:
>
> 1. About the naming: I'd also vote for option 2 and enforce users to
> rename when import since this scenario seems rare to me.
>
> 2. About the dependency: since this KIP can only be merged as early as
> 1.2, it means that for users who wants to use this artifact, say for
> version 1.2, she would need to bring in "kafka-streams" version 1.2. In
> addition, if we change the Java API in the future versions we'd also need
> to update the Scala wrapper at the same version as well, so in other words
> "kafka-streams-scala" version X have to be with "kafka-streams" version X
> anyways. So I'd suggest we remove the version number in "kafka-streams-scala
> only depends on the Scala standard library and Kafka Streams" as it may
> be confusing to readers.
>
> 3. For the KIP discussion, it need be based on the proposed state of this
> project in AK. More specifically, for the groupId in maven / gradle, it
> need to be "org.apache.kafka"; for the version, it need to be Kafka release
> versions, e.g. 1.2.0.
>
> 4. In "New or Changed Public Interfaces" section, it does not mention the
> serde classes like "DefaultSerdes", but I think they should also be
> considered public APIs as users would most likely import these in her
> implementation.
>
> 5. Could you also list the changes you'd propose to made to build.gradle
> in kafka for adding this artifact? More details will help readers to better
> understand your proposal.
>
> 6. I think it'd will be good to have a WordCount example code as part of
> this KIP, to illustrate how to code in this Scala wrapper, e.g. in
> o.a.k.scala.examples. But for this class we probably do not need to have a
> separate artifact for it as we did in kafka-streams-examples.
>
>
> Guozhang
>
>
> On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu  wrote:
>
>> I agree with Sean on name unification.
>>
>> +1 to option 2.
>>
>> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover 
>> wrote:
>>
>> > Type names: I vote for option 2.  The user must explicitly add a
>> dependency
>> > to this library and the wrapper types are in a different package.  It
>> seems
>> > reasonable to expect them to do an import rename if there's a need to
>> drop
>> > down to the Java API.
>> >
>> > Test Utils: The test utils in kafka-streams-scala are nice and lean, but
>> > I'm not sure if it provides much more value than other options that
>> exist
>> > in the community.  There's an embedded Kafka/ZK project implementation
>> for
>> > ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.
>> It
>> > implies you must also use ScalaTest, which I acknowledge isn't
>> everyone's
>> > first choice for Scala test framework, but it probably is one of, if not
>> > the most, popular library.  It includes a DSL for Kafka Streams too.  If
>> > this KIP is accepted then perhaps a PR to that project could be made to
>> > support the new wrapper implementations.
>> >
>> > https://github.com/manub/scalatest-embedded-kafka#
>> > scalatest-embedded-kafka-streams
>> >
>> > Sean
>> >
>> > On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
>> > debasish.gh...@lightbend.com> wrote:
>> >
>> > > >
>> > > > > Should this be 1.2  (maybe it's even better to not put any
>> version at
>> > > > all)
>> > >
>> > >
>> > > Actually wanted to emphasize that the support is from 1.0.0 onwards ..
>> > > Should we make that explicit ? Like ..
>> > >
>> > > kafka-streams-scala only depends on the Scala standard library and
>> Kafka
>> > > > Streams 1.0.0+.
>> > >
>> > >
>> > >  In 1.1 release, we add a new module `kafka-streams-test-utils` to
>> > simplify
>> > > > testing for Kafka Streams applications. Are those test utils
>> suitable
>> > for
>> > > > Scala users or should we add Scala wrappers for those, too?
>> > >
>> > >
>> > > I will check up and let you know ..
>> > >
>> > > Also I am not clear about the decision on renaming of Scala
>> abstractions.
>> > > Can we have a consensus on this ? Here's the summary ..
>> > >
>> > > *Option 1:* Keep names separate (KStream for Java class, KStreamS for
>> > > Scala). No renaming of imports required.
>> > > *Option 2:* Unify names (KStream for Java and Scala class names). No
>> > > conflict since they will reside in different packages. But if we need
>> to
>> > > use both abstractions, renaming of imports are required. But again,
>> this
>> > > may not be a too frequent use case.
>> > >
>> > > Suggestions ?
>> > >
>> > > regards.
>> > >
>> > > On Sat, Mar 17, 2018 at 3:07 AM, Matthias J. Sax <
>> matth...@confluent.io>
>> > 

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Debasish Ghosh
Based on this discussion I have a question ..

For the actual implementation we need to have a PR on
https://github.com/apache/kafka and the new library will be in the package
structure org.apache.kafka.streams.scala. My question is, is this PR part
of the KIP ? Or we work on the PR once the KIP is accepted ..

regards.

On Mon, Mar 19, 2018 at 11:28 PM, Debasish Ghosh <
debasish.gh...@lightbend.com> wrote:

> Thanks Guozhang for the comments .. we will start working on them ..
>
> regards.
>
> On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang 
> wrote:
>
>> And one more comment about the type safety:
>>
>> 7. I'm in favor of the approach of enforcing type safety at compile time,
>> since with Scala's strong typing system I think it makes more sense to get
>> rid of config-based serde specifications that can cause runtime error in
>> the Scala wrapper.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
>> wrote:
>>
>> > Hello Debasish,
>> >
>> > Thanks for the KIP. Here are some comments:
>> >
>> > 1. About the naming: I'd also vote for option 2 and enforce users to
>> > rename when import since this scenario seems rare to me.
>> >
>> > 2. About the dependency: since this KIP can only be merged as early as
>> > 1.2, it means that for users who wants to use this artifact, say for
>> > version 1.2, she would need to bring in "kafka-streams" version 1.2. In
>> > addition, if we change the Java API in the future versions we'd also
>> need
>> > to update the Scala wrapper at the same version as well, so in other
>> words
>> > "kafka-streams-scala" version X have to be with "kafka-streams" version
>> X
>> > anyways. So I'd suggest we remove the version number in
>> "kafka-streams-scala
>> > only depends on the Scala standard library and Kafka Streams" as it may
>> > be confusing to readers.
>> >
>> > 3. For the KIP discussion, it need be based on the proposed state of
>> this
>> > project in AK. More specifically, for the groupId in maven / gradle, it
>> > need to be "org.apache.kafka"; for the version, it need to be Kafka
>> release
>> > versions, e.g. 1.2.0.
>> >
>> > 4. In "New or Changed Public Interfaces" section, it does not mention
>> the
>> > serde classes like "DefaultSerdes", but I think they should also be
>> > considered public APIs as users would most likely import these in her
>> > implementation.
>> >
>> > 5. Could you also list the changes you'd propose to made to build.gradle
>> > in kafka for adding this artifact? More details will help readers to
>> better
>> > understand your proposal.
>> >
>> > 6. I think it'd will be good to have a WordCount example code as part of
>> > this KIP, to illustrate how to code in this Scala wrapper, e.g. in
>> > o.a.k.scala.examples. But for this class we probably do not need to
>> have a
>> > separate artifact for it as we did in kafka-streams-examples.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu  wrote:
>> >
>> >> I agree with Sean on name unification.
>> >>
>> >> +1 to option 2.
>> >>
>> >> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover <
>> sean.glo...@lightbend.com>
>> >> wrote:
>> >>
>> >> > Type names: I vote for option 2.  The user must explicitly add a
>> >> dependency
>> >> > to this library and the wrapper types are in a different package.  It
>> >> seems
>> >> > reasonable to expect them to do an import rename if there's a need to
>> >> drop
>> >> > down to the Java API.
>> >> >
>> >> > Test Utils: The test utils in kafka-streams-scala are nice and lean,
>> but
>> >> > I'm not sure if it provides much more value than other options that
>> >> exist
>> >> > in the community.  There's an embedded Kafka/ZK project
>> implementation
>> >> for
>> >> > ScalaTest that's popular and active: manub/scalatest-embedded-kakfa.
>> >> It
>> >> > implies you must also use ScalaTest, which I acknowledge isn't
>> >> everyone's
>> >> > first choice for Scala test framework, but it probably is one of, if
>> not
>> >> > the most, popular library.  It includes a DSL for Kafka Streams
>> too.  If
>> >> > this KIP is accepted then perhaps a PR to that project could be made
>> to
>> >> > support the new wrapper implementations.
>> >> >
>> >> > https://github.com/manub/scalatest-embedded-kafka#
>> >> > scalatest-embedded-kafka-streams
>> >> >
>> >> > Sean
>> >> >
>> >> > On Sun, Mar 18, 2018 at 4:05 AM, Debasish Ghosh <
>> >> > debasish.gh...@lightbend.com> wrote:
>> >> >
>> >> > > >
>> >> > > > > Should this be 1.2  (maybe it's even better to not put any
>> >> version at
>> >> > > > all)
>> >> > >
>> >> > >
>> >> > > Actually wanted to emphasize that the support is from 1.0.0
>> onwards ..
>> >> > > Should we make that explicit ? Like ..
>> >> > >
>> >> > > kafka-streams-scala only depends on the Scala standard library and
>> >> Kafka
>> >> > > > Streams 1.0.0+.
>> >> > >
>> >> > >
>> >> > >  In 1.1 release, we add a new module `kafka-streams-test-utils` to
>> 

Support kafka

2018-03-19 Thread johan enrique alcala este
Dear,

Please your valuable support as I need to configure on a server
(Docker) Server1:

confluentinc / cp-schema-registry
confluentinc / cp-kafka-rest
landoop / kafka-topics-ui

For a named kafka cluster (3 independent nodes):

zookeeper1: 2181, zookeeper2: 2181, zookeeper3: 2181

   Insist from Server 1 Docker:

docker run -d \
  --net = host \
   --name = schema-registry \
   -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL = zookeeper1: 2181,
zookeeper2: 2181, zookeeper3: 2181 \
   -e SCHEMA_REGISTRY_HOST_NAME = localhost \
   -e SCHEMA_REGISTRY_LISTENERS = http: // localhost: 8081 \
   confluentinc / cp-schema-registry: 3.1.1



but the connection is not made ...



*Best Regards,*
*Johan Alcalá*
*Phone & Whatsapp: **+573178550212*
*Skype: johan_1...@hotmail.com *


Jenkins build is back to normal : kafka-1.0-jdk7 #174

2018-03-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Guozhang Wang
Yup :)

On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu  wrote:

> bq. some snippet like ProduceRequest / ProduceRequest
>
> Did you mean ProduceRequest / Response ?
>
> Cheers
>
> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:
>
> > Hi Matthias,
> >
> > About 2: yeah I guess this is a subjective preference. My main concern
> > about keeping the config / handling code beyond 1.2 release is that it
> will
> > become a non-cleanable tech debt forever, as fewer and fewer users would
> > need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
> > maintain this for nearly no one. On the other hand, I agree that this
> tech
> > debt is not too large. So if more people feel this is a good tradeoff to
> > pay for not enforcing users from older versions to upgrade twice I'm
> happen
> > to change my opinion.
> >
> > A few more minor comments:
> >
> > 4. For the values of "upgrade.from", could we simply to only major.minor?
> > I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
> > behavior in bug fix releases we would not need to specify a bug-fix
> version
> > to distinguish ever.
> >
> > 5. Could you also present the encoding format in subscription /
> assignment
> > metadata bytes in version 2, and in future versions (i.e. which first
> bytes
> > would be kept moving forward), for readers to better understand the
> > proposal? some snippet like ProduceRequest / ProduceRequest in
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > would be very helpful.
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
> > wrote:
> >
> > > Thanks for your comments.
> > >
> > > 1. Because the old leader cannot decode the new Subscription it can
> only
> > > send an empty assignment back. The idea to send empty assignments to
> all
> > > members is interesting. I will try this out in an PR to see how it
> > behaves.
> > >
> > > 2. I don't see an issue with keeping config `upgrade.from` for future
> > > releases. Personally, I would prefer to not force users to do two
> > > upgrades if they want to go from pre-1.2 to post-1.2 version. Is there
> a
> > > technical argument why you want to get rid of the config? What
> > > disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
> > >
> > > Keeping the config is just a few lines of code in `StreamsConfig` as
> > > well we a single `if` statement in `StreamsPartitionAssignor` to force
> a
> > > downgrade (cf
> > > https://github.com/apache/kafka/pull/4636/files#diff-
> > > 392371c29384e33bb09ed342e7696c68R201)
> > >
> > >
> > > 3. I updated the KIP accordingly.
> > >
> > >
> > > -Matthias
> > >
> > > On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > > > Hello Matthias, thanks for the KIP. Here are some comments:
> > > >
> > > > 1. "For all other instances the leader sends a regular Assignment in
> > > > version X back." Does that mean the leader will exclude any member of
> > the
> > > > group whose protocol version that it does not understand? For
> example,
> > if
> > > > we have A, B, C with A the leader, and B bounced with the newer
> > version.
> > > In
> > > > the first rebalance, A will only consider {A, C} for assignment while
> > > > sending empty assignment to B. And then later when B downgrades will
> it
> > > > re-assign the tasks to it again? I felt this is unnecessarily
> > increasing
> > > > the num. rebalances and the total latency. Could the leader just
> sends
> > > > empty assignment to everyone, and since upon receiving the empty
> > > assignment
> > > > each thread will not create / restore any tasks and will not clean up
> > its
> > > > local state (so that the prevCachedTasks are not lost in future
> > > rebalances)
> > > > and re-joins immediately, if users choose to bounce an instance once
> it
> > > is
> > > > in RUNNING state the total time of rolling upgrades will be reduced.
> > > >
> > > > 2. If we want to allow upgrading from 1.1- versions to any of the
> > future
> > > > versions beyond 1.2, then we'd always need to keep the special
> handling
> > > > logic for this two rolling-bounce mechanism plus a config that we
> would
> > > > never be able to deprecate; on the other hand, if the version probing
> > > > procedure is fast, I think the extra operational cost from upgrading
> > from
> > > > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
> > another
> > > > upgrade from 1.2 to a future version could be small. So depending on
> > the
> > > > experimental result of the upgrade latency, I'd suggest considering
> the
> > > > trade-off of the extra code/config needed maintaining for the special
> > > > handling.
> > > >
> > > > 3. Testing plan: could you elaborate a bit more on the actual
> > > upgrade-paths
> > > > we should test? For example, I'm thinking the following:
> > > >
> > > > a. 0.10.0 -> 1.2
> > > > b. 1.1 -> 1.2
> > > 

Seeking Feedback on Kafka Connect Issues

2018-03-19 Thread Matt Farmer
Hi everyone,

We’ve been experimenting recently with some limited use of Kafka Connect and 
are hoping to expand to wider use cases soon. However, we had some internal 
issues that gave us a well-timed preview of error handling behavior in Kafka 
Connect. I think the fixes for this will require at least three different KIPs, 
but I want to share some thoughts to get the initial reaction from folks in the 
dev community. If these ideas seem reasonable, I can go ahead and create the 
required KIPs.

Here are the three things specifically we ran into…

---

(1) Kafka Connect only retries tasks when certain exceptions are thrown
Currently, Kafka Connect only retries tasks when certain exceptions are thrown 
- I believe the logic checks to see if the exception is specifically marked as 
“retryable” and if not, fails. We’d like to bypass this behavior and implement 
a configurable exponential backoff for tasks regardless of the failure reason. 
This is probably two changes: one to implement exponential backoff retries for 
tasks if they don’t already exist and a chance to implement a RetryPolicy 
interface that evaluates the Exception to determine whether or not to retry.

(2) Kafka Connect doesn’t permit Connectors to smartly reposition after 
rebalance
We’re using the S3 connector to dump files with a large number of records into 
an S3 bucket. About 100,000 records per file. Unfortunately, every time a task 
fails, the consumer rebalance causes all partitions to get re-shuffled amongst 
the various partitions. To compensate for this, the connector gets stopped and 
started from what I can tell from the logs? And then picks up from the last 
consumer position that was committed to the brokers.

This doesn’t work great if you’re batching things into large numbers for 
archival.

For the S3 connector, for example: Let’s say I have two partitions and the 
connector has two tasks to process each of those. Task 0 is at 5,000 records 
read from the last commit and Task 1 is at 70,000 records read from the last 
commit. Then, boom, something goes wrong with Task 0 and it falls over. This 
triggers a rebalance and Task 1 has to take over the workload. Task 1 will, at 
this point, discard the 70,000 records in its buffer and start from the last 
commit point. This failure mode is brutal for the archival system we’re 
building.

There are two solutions that I can think of to this:

(A) Provide an interface for connectors to define their own rebalance listener. 
This listener could compare the newly assigned list of partitions with a 
previously assigned list. For all partitions that this connector was already 
working on prior to the rebalance, it could manually seek to the last position 
it locally processed before resuming. So, in the scenario above Task 1 could 
keep an accounting file locally and seek over the first 70,000 records without 
reprocessing them. It would then wait until after it confirms the S3 upload to 
commit those offsets back to Kafka. This ensures that if the machine running 
Task 1 dies a new consumer can take its place, but we’ll still benefit from a 
local cache if one is present.

(B) Have connect manually round robin partitions on a topic to tasks and never 
rebalance them automatically. If this were combined with better task retry 
semantics, I think this solution would be simpler.

(3) As far as I can tell, JMX metrics aren’t reporting the number of active 
tasks
This one is arguably the simplest issue to resolve, but we’d like to alert if 
the number of active tasks isn’t what we expect it to be so that we can have a 
human investigate.

---

I would love thoughts on all of the above from anyone on this list.

Thanks,

Matt Farmer

Re: [DISCUSS] KIP-242: Mask password fields in Kafka Connect REST response

2018-03-19 Thread Matt Farmer
What’s the status of this? This is a pretty hard blocker for us to meet 
requirements internally to deploy connect in a distributed fashion.

@Ewen - Regarding the concern of accessing information securely - has there 
been any consideration of adding authentication to the connect api?

> On Jan 17, 2018, at 3:55 PM, Randall Hauch  wrote:
> 
> Vincent,
> 
> Can the KIP more explicitly say that this is opt-in, and that by default
> nothing will change?
> 
> Randall
> 
> On Tue, Jan 16, 2018 at 11:18 PM, Ewen Cheslack-Postava 
> wrote:
> 
>> Vincent,
>> 
>> I think with the addition of a configuration to control this for
>> compatibility, people would generally be ok with it. If you want to start a
>> VOTE thread, the KIP deadline is coming up and the PR looks pretty small. I
>> will take a pass at reviewing the PR so we'll be ready to merge if we can
>> get the KIP voted through.
>> 
>> Thanks,
>> Ewen
>> 
>> On Fri, Jan 12, 2018 at 10:18 AM, Vincent Meng  wrote:
>> 
>>> @Ted: The issue is kinda hard to reproduce. It's just something we
>> observe
>>> over time.
>>> 
>>> @Ewen: I agree. Opt-in seems to be a good solution to me. To your
>> question,
>>> if there is no ConfDef that defines which fields are Passwords we can
>> just
>>> return the config as is.
>>> 
>>> There is a PR for this KIP already. Comments/Discussions are welcome.
>>> https://github.com/apache/kafka/pull/4269
>>> 
>>> On Tue, Jan 2, 2018 at 8:52 PM, Ewen Cheslack-Postava >> 
>>> wrote:
>>> 
 Vincent,
 
 Thanks for the KIP. This is definitely an issue we know is a problem
>> for
 some users.
 
 I think the major problem with the KIP as-is is that it makes it
>>> impossible
 to get the original value back out of the API. This KIP probably ties
>> in
 significantly with ideas for securing the REST API (SSL) and adding
>> ACLs
>>> to
 it. Both are things we know people want, but haven't happened yet.
>>> However,
 it also interacts with other approaches to adding those features, e.g.
 layering proxies on top of the existing API (e.g. nginx, apache, etc).
>>> Just
 doing a blanket replacement of password values with a constant would
>>> likely
 break things for people who secure things via a proxy (and may just not
 allow reads of configs unless the user is authorized for the particular
 connector). These are the types of concerns we like to think through in
>>> the
 compatibility section. One option to get the masking functionality in
 without depending on a bunch of other security improvements might be to
 make this configurable so users that need this (and can forgo seeing a
 valid config via the API) can opt-in.
 
 Regarding your individual points:
 
 * I don't think the particular value for the masked content matters
>> much.
 Any constant indicating a password field is good. Your value seems fine
>>> to
 me.
 * I don't think ConnectorInfo has enough info on its own to do proper
 masking. In fact, I think you need to parse the config enough to get
>> the
 Connector-specific ConfigDef out in order to determine which fields are
 Password fields. I would probably try to push this to be as central as
 possible, maybe adding a method to AbstractHerder that can get configs
>>> with
 a boolean indicating whether they need to have sensitive fields
>> removed.
 That method could deal with parsing the config to get the right
>>> connector,
 getting the connector config, and then sanitizing any configs that are
 sensitive. We could have this in one location, then have the relevant
>>> REST
 APIs just use the right flag to determine if they get sanitized or
 unsanitized data.
 
 That second point raises another interesting point -- what happens if
>> the
 connector configuration references a connector which the worker serving
>>> the
 REST request *does not know about*? In that case, there will be no
 corresponding ConfigDef that defines which fields are Passwords and
>> need
>>> to
 be sensitized. Does it return an error? Or just return the config as
>> is?
 
 -Ewen
 
 On Thu, Dec 28, 2017 at 3:34 AM, Ted Yu  wrote:
 
> For the last point you raised, can you come up with a unit test that
 shows
> what you observed ?
> 
> Cheers
> 
> On Mon, Dec 18, 2017 at 11:14 AM, Vincent Meng 
>> wrote:
> 
>> Hi all,
>> 
>> I've created KIP-242, a proposal to secure credentials in kafka
>>> connect
>> rest endpoint.
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 242%3A+Mask+password+in+Kafka+Connect+Rest+API+response
>> 
>> Here are something I'd like to discuss:
>> 
>>   - The "masked" value is set to "*" (9 stars) currently.
>>> It's
> an
>>   arbitrary 

[jira] [Resolved] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-19 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-6661.
--
Resolution: Fixed

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Richard Yu
Hi Guozhang,

I made some clarifications to KIP-266, namely:
1. Stated more specifically that commitSync will accept user input.
2. fetchCommittedOffsets(): Made its role in blocking more clear to the
reader.
3. Sketched what would happen when time limit is exceeded.

These changes should make the KIP easier to understand.

Cheers,
Richard

On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang  wrote:

> Hi Richard,
>
> I made a pass over the KIP again, some more clarifications / comments:
>
> 1. seek() call itself is not blocking, only the following poll() call may
> be blocking as the actually metadata rq will happen.
>
> 2. I saw you did not include Consumer.partitionFor(),
> Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. After
> a second thought, I think this may be a better idea to not tackle them in
> the same KIP, and probably we should consider whether we would change the
> behavior or not in another discussion. So I agree to not include them.
>
> 3. In your wiki you mentioned "Another change shall be made to
> KafkaConsumer#poll(), due to its call to updateFetchPositions() which
> blocks indefinitely." This part may a bit obscure to most readers who's not
> familiar with the KafkaConsumer internals, could you please add more
> elaborations. More specifically, I think the root causes of the public APIs
> mentioned are a bit different while the KIP's explanation sounds like they
> are due to the same reason:
>
> 3.1 fetchCommittedOffsets(): this internal call will block forever if the
> committed offsets cannot be fetched successfully and affect position() and
> committed(). We need to break out of its internal while loop.
> 3.2 position() itself will while loop when offsets cannot be retrieved in
> the underlying async call. We need to break out this while loop.
> 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should take
> the user specified timeouts when applicable.
>
>
>
> Guozhang
>
> On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu 
> wrote:
>
> > Actually, what I said above is inaccurate. In
> > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, not
> > seek.
> > My assumption is that seek did not update correctly. I will be digging
> > further into this.
> >
> >
> >
> > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu 
> > wrote:
> >
> > > One more thing: when looking through tests, I have realized that seek()
> > > methods can potentially block indefinitely. As you well know, seek() is
> > > called when pollOnce() or position() is active. Thus, if position()
> > blocks
> > > indefinitely, then so would seek(). Should bounding seek() also be
> > included
> > > in this KIP?
> > >
> > > Thanks, Richard
> > >
> > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu <
> yohan.richard...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for the advice, Jason
> > >>
> > >> I have modified KIP-266 to include the java doc for committed() and
> > other
> > >> blocking methods, and I also
> > >> mentioned poll() which will also be bounded. Let me know if there is
> > >> anything else. :)
> > >>
> > >> Sincerely, Richard
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson  >
> > >> wrote:
> > >>
> > >>> Hi Richard,
> > >>>
> > >>> Thanks for the updates. I'm really glad you picked this up. A couple
> > >>> minor
> > >>> comments:
> > >>>
> > >>> 1. Can you list the full set of new APIs explicitly in the KIP?
> > >>> Currently I
> > >>> only see the javadoc for `position()`.
> > >>>
> > >>> 2. We should consider adding `TimeUnit` to the new methods to avoid
> > unit
> > >>> confusion. I know it's inconsistent with the poll() API, but I think
> it
> > >>> was
> > >>> probably a mistake not to include it there, so better not to double
> > down
> > >>> on
> > >>> that mistake. And note that we do already have `close(long,
> TimeUnit)`.
> > >>>
> > >>> Other than that, I think the current KIP seems reasonable.
> > >>>
> > >>> Thanks,
> > >>> Jason
> > >>>
> > >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu <
> > yohan.richard...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Note to all: I have included bounding commitSync() and committed()
> in
> > >>> this
> > >>> > KIP.
> > >>> >
> > >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu <
> > >>> yohan.richard...@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi all,
> > >>> > >
> > >>> > > I updated the KIP where overloading position() is now the favored
> > >>> > approach.
> > >>> > > Bounding position() using requestTimeoutMs has been listed as
> > >>> rejected.
> > >>> > >
> > >>> > > Any thoughts?
> > >>> > >
> > >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang <
> wangg...@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > >> I agree that adding the overloads is most flexible. But going
> for
> > >>> that
> > >>> > >> direction we'd do that for all the blocking call that I've
> listed
> 

Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

2018-03-19 Thread Jun Rao
Hi, Jason,

Thanks for the comment. I didn't realize that the KIP was only focusing on
the latency of the producer. From an application's perspective, if an
application is latency sensitive, it probably wants to optimize the
end-to-end latency from the producer to the consumer. Litao, would you
agree with that?

Assuming that we want to also optimize the consumer latency, a second
question is whether it's still useful to maintain the concept of ISR. ISR
may not be strictly needed since one can just elect the leader from RF - Q
+ 1 replicas (RF is replication factor and Q is the quorum size). However,
there is some benefit of maintaining ISR when Q is not the majority.
Consider RF = 4 and Q = 2. Let's say at some point only 3 replicas are
alive and we only wait for the 2 fastest replicas. Then, another replica
fails. If we don't maintain ISR, then we can't select the new leader since
we need 3 replicas for leader election. If we maintain ISR, we can select
the new leader in this case. Arguably this is may not be a common use case.
However, this gives people more flexibility to optimize the latency.

If we want to keep the concept of ISR, the algorithm could be the
following. We maintain high watermark and ISR as they are today. The
producer will only receive an ack when Q replicas have received a record.
The leader will be maintaining a special high watermark (say Q-HW), which
is the min log end offset of the fastest Q in-sync replicas. In that mode,
the consumer can read records up to Q-HW. When the leader fails, the
replica in the remaining live in-sync replicas with the longest log will be
selected as the new leader. The guarantee is then if there are fewer than Q
replica failures, there is no data loss to the consumer.

A third question is whether the leader election should involve the
controller. Colin has several good points on the controller-less approach.
However, the pros for the controller approach are:
1. It batches the leader election and the propagation of the metadata when
leader changes. These lead to many fewer cross broker RPC requests during
leader election.
2. It serializes different events that may require leader change (e.g.,
reassign partitions, controlled shutdown, preferred leader election, etc).
This avoids conflicting decisions made at the same time and makes it easier
to reason about the sequencing of the metadata.

Supporting the above in a controller-less approach is possible, but
probably requires more work. So, perhaps this can be revisited in the
future.

Thanks,

Jun


On Thu, Mar 8, 2018 at 8:47 AM, Jason Gustafson  wrote:

> Initially I thought this proposal was just about improving producer
> latency. So acks=quorum (or whatever) was sort of the midway point between
> acks=1 and acks=all, but offsets would only be exposed according to current
> high watermark semantics (meaning full ISR replication). However, it looks
> like we are also trying to allow messages to be exposed to consumers as
> soon as we have achieved replication to a quorum. In this case, I don't
> think the ISR leader election makes sense any longer unless we're willing
> to accept consumers seeing uncommitted data (which seems like a bad
> regression to me). Probably the only way you can avoid it is to also
> require a quorum for leader election. But maybe that's not so bad? If a
> quorum is not present, producers wouldn't be able to make progress anyway,
> so the only advantage of allowing election from a minority of replicas is
> potentially the ability to serve old reads. That should be possible from
> any replica anyway as long as we know that the data has been committed. So
> effectively we would be giving users a 2f + 1 option. And I agree with
> Colin that if we had that, future work could investigate moving the leader
> election out of the controller.
>
> -Jason
>
> On Tue, Mar 6, 2018 at 6:45 PM, Jun Rao  wrote:
>
> > Hi, Colin,
> >
> > A couple of thoughts on your previous comments.
> >
> > 1. I am not sure if the controller is necessarily a bottleneck for leader
> > election. In the common case, leader election is due to a single broker
> > failure. So, the controller only needs to change the leaders for the
> number
> > of partitions on that broker, not for the whole cluster. Also, the
> > controller can do the leader election for all affected partitions in a
> > batch fashion and communicate the new leader for all partitions in a
> single
> > RPC request.
> >
> > 2. Keeping the metadata such as ISR in ZK in the quorum mode has some
> > benefits. Suppose that a user wants to wait for a quorum of 2 out of 4
> > replicas. At some point, replicas A and B are in sync, and replicas C
> and D
> > are out of sync. Later on, replicas A and B fail. By checking ISR, we can
> > choose not to elect a new leader from replicas C and D to avoid unclean
> > leader election. If we don't maintain ISR, this would be hard.
> >
> > Hi, Litao,
> >
> > At the high level, 

[DISCUSS] KIP-272: Add API version tag to broker's RequestsPerSec metric

2018-03-19 Thread Allen Wang
Hi all,

I have created KIP-272: Add API version tag to broker's RequestsPerSec
metric.

Here is the link to the KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric

Looking forward to the discussion.

Thanks,
Allen


Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Ismael Juma
Matthias,

You have to choose between the package and class names when it comes to
adding a suffix (or prefix). I think the package name is the lesser of two
evils, but it would be interesting to know what others think.

I agree that we should include the information in the KIP (whatever we
decide).

Ismael

On Mon, Mar 19, 2018 at 3:30 PM, Matthias J. Sax 
wrote:

> About the package name.
>
> Would it be better/cleaner to omit the `scala` sub-package? Or is this
> required to avoid naming conflicts with the Java classes? If yes, please
> point it out in the KIP.
>
>
> -Matthias
>
> On 3/19/18 11:24 AM, Debasish Ghosh wrote:
> > Cool .. so the changes that u suggested e.g the WordCount example,
> package
> > name changes etc. will be in the PR only and not in the KIP document ..
> >
> > Or we keep the KIP document updated as well ?
> >
> > regards.
> >
> > On Mon, 19 Mar 2018 at 11:46 PM, Guozhang Wang 
> wrote:
> >
> >> Hi Debasish,
> >>
> >> You can work on the PR in parallel to the KIP discussion so that people
> can
> >> start reviewing it. The only restriction is that the PR cannot be merged
> >> until the KIP is accepted.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 19, 2018 at 11:09 AM, Debasish Ghosh <
> >> debasish.gh...@lightbend.com> wrote:
> >>
> >>> Based on this discussion I have a question ..
> >>>
> >>> For the actual implementation we need to have a PR on
> >>> https://github.com/apache/kafka and the new library will be in the
> >> package
> >>> structure org.apache.kafka.streams.scala. My question is, is this PR
> part
> >>> of the KIP ? Or we work on the PR once the KIP is accepted ..
> >>>
> >>> regards.
> >>>
> >>> On Mon, Mar 19, 2018 at 11:28 PM, Debasish Ghosh <
> >>> debasish.gh...@lightbend.com> wrote:
> >>>
>  Thanks Guozhang for the comments .. we will start working on them ..
> 
>  regards.
> 
>  On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang 
>  wrote:
> 
> > And one more comment about the type safety:
> >
> > 7. I'm in favor of the approach of enforcing type safety at compile
> >>> time,
> > since with Scala's strong typing system I think it makes more sense
> to
> >>> get
> > rid of config-based serde specifications that can cause runtime error
> >> in
> > the Scala wrapper.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
> > wrote:
> >
> >> Hello Debasish,
> >>
> >> Thanks for the KIP. Here are some comments:
> >>
> >> 1. About the naming: I'd also vote for option 2 and enforce users to
> >> rename when import since this scenario seems rare to me.
> >>
> >> 2. About the dependency: since this KIP can only be merged as early
> >> as
> >> 1.2, it means that for users who wants to use this artifact, say for
> >> version 1.2, she would need to bring in "kafka-streams" version 1.2.
> >>> In
> >> addition, if we change the Java API in the future versions we'd also
> > need
> >> to update the Scala wrapper at the same version as well, so in other
> > words
> >> "kafka-streams-scala" version X have to be with "kafka-streams"
> >>> version
> > X
> >> anyways. So I'd suggest we remove the version number in
> > "kafka-streams-scala
> >> only depends on the Scala standard library and Kafka Streams" as it
> >>> may
> >> be confusing to readers.
> >>
> >> 3. For the KIP discussion, it need be based on the proposed state of
> > this
> >> project in AK. More specifically, for the groupId in maven / gradle,
> >>> it
> >> need to be "org.apache.kafka"; for the version, it need to be Kafka
> > release
> >> versions, e.g. 1.2.0.
> >>
> >> 4. In "New or Changed Public Interfaces" section, it does not
> >> mention
> > the
> >> serde classes like "DefaultSerdes", but I think they should also be
> >> considered public APIs as users would most likely import these in
> >> her
> >> implementation.
> >>
> >> 5. Could you also list the changes you'd propose to made to
> >>> build.gradle
> >> in kafka for adding this artifact? More details will help readers to
> > better
> >> understand your proposal.
> >>
> >> 6. I think it'd will be good to have a WordCount example code as
> >> part
> >>> of
> >> this KIP, to illustrate how to code in this Scala wrapper, e.g. in
> >> o.a.k.scala.examples. But for this class we probably do not need to
> > have a
> >> separate artifact for it as we did in kafka-streams-examples.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu 
> >> wrote:
> >>
> >>> I agree with Sean on name unification.
> >>>
> >>> +1 to option 2.
> >>>
> >>> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover <
> > 

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Ismael Juma
Hi,

An option that is not currently covered in the KIP is to have a separate
config max.block.ms, which is similar to the producer config with the same
name. This came up during the KAFKA-2391 discussion. I think it's clear
that we can't rely on request.timeout.ms, so the decision is between adding
overloads or adding a new config. People seemed to be leaning towards the
latter in KAFKA-2391, but Jason makes a good point that the overloads are
more flexible. A couple of questions from me:

1. Do we need the additional flexibility?
2. If we do, do we need it for every blocking method?

Ismael

On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu 
wrote:

> Hi Guozhang,
>
> I made some clarifications to KIP-266, namely:
> 1. Stated more specifically that commitSync will accept user input.
> 2. fetchCommittedOffsets(): Made its role in blocking more clear to the
> reader.
> 3. Sketched what would happen when time limit is exceeded.
>
> These changes should make the KIP easier to understand.
>
> Cheers,
> Richard
>
> On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang  wrote:
>
> > Hi Richard,
> >
> > I made a pass over the KIP again, some more clarifications / comments:
> >
> > 1. seek() call itself is not blocking, only the following poll() call may
> > be blocking as the actually metadata rq will happen.
> >
> > 2. I saw you did not include Consumer.partitionFor(),
> > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP.
> After
> > a second thought, I think this may be a better idea to not tackle them in
> > the same KIP, and probably we should consider whether we would change the
> > behavior or not in another discussion. So I agree to not include them.
> >
> > 3. In your wiki you mentioned "Another change shall be made to
> > KafkaConsumer#poll(), due to its call to updateFetchPositions() which
> > blocks indefinitely." This part may a bit obscure to most readers who's
> not
> > familiar with the KafkaConsumer internals, could you please add more
> > elaborations. More specifically, I think the root causes of the public
> APIs
> > mentioned are a bit different while the KIP's explanation sounds like
> they
> > are due to the same reason:
> >
> > 3.1 fetchCommittedOffsets(): this internal call will block forever if the
> > committed offsets cannot be fetched successfully and affect position()
> and
> > committed(). We need to break out of its internal while loop.
> > 3.2 position() itself will while loop when offsets cannot be retrieved in
> > the underlying async call. We need to break out this while loop.
> > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should
> take
> > the user specified timeouts when applicable.
> >
> >
> >
> > Guozhang
> >
> > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu 
> > wrote:
> >
> > > Actually, what I said above is inaccurate. In
> > > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks,
> not
> > > seek.
> > > My assumption is that seek did not update correctly. I will be digging
> > > further into this.
> > >
> > >
> > >
> > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu <
> yohan.richard...@gmail.com>
> > > wrote:
> > >
> > > > One more thing: when looking through tests, I have realized that
> seek()
> > > > methods can potentially block indefinitely. As you well know, seek()
> is
> > > > called when pollOnce() or position() is active. Thus, if position()
> > > blocks
> > > > indefinitely, then so would seek(). Should bounding seek() also be
> > > included
> > > > in this KIP?
> > > >
> > > > Thanks, Richard
> > > >
> > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu <
> > yohan.richard...@gmail.com>
> > > > wrote:
> > > >
> > > >> Thanks for the advice, Jason
> > > >>
> > > >> I have modified KIP-266 to include the java doc for committed() and
> > > other
> > > >> blocking methods, and I also
> > > >> mentioned poll() which will also be bounded. Let me know if there is
> > > >> anything else. :)
> > > >>
> > > >> Sincerely, Richard
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Richard,
> > > >>>
> > > >>> Thanks for the updates. I'm really glad you picked this up. A
> couple
> > > >>> minor
> > > >>> comments:
> > > >>>
> > > >>> 1. Can you list the full set of new APIs explicitly in the KIP?
> > > >>> Currently I
> > > >>> only see the javadoc for `position()`.
> > > >>>
> > > >>> 2. We should consider adding `TimeUnit` to the new methods to avoid
> > > unit
> > > >>> confusion. I know it's inconsistent with the poll() API, but I
> think
> > it
> > > >>> was
> > > >>> probably a mistake not to include it there, so better not to double
> > > down
> > > >>> on
> > > >>> that mistake. And note that we do already have `close(long,
> > TimeUnit)`.
> > > >>>
> > > >>> Other than that, I think the current KIP seems reasonable.
> > > >>>
> > > 

Build failed in Jenkins: kafka-trunk-jdk7 #3273

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] MINOR: Fix some compiler warnings (#4726)

--
[...truncated 1.54 MB...]
at hudson.model.AbstractProject.poll(AbstractProject.java:1287)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = false] PASSED
ERROR: Could not install GRADLE_3_5_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:458)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:666)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:631)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:391)
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1384)
   

Re: [DISCUSS] KIP-270 A Scala wrapper library for Kafka Streams

2018-03-19 Thread Matthias J. Sax
About the package name.

Would it be better/cleaner to omit the `scala` sub-package? Or is this
required to avoid naming conflicts with the Java classes? If yes, please
point it out in the KIP.


-Matthias

On 3/19/18 11:24 AM, Debasish Ghosh wrote:
> Cool .. so the changes that u suggested e.g the WordCount example, package
> name changes etc. will be in the PR only and not in the KIP document ..
> 
> Or we keep the KIP document updated as well ?
> 
> regards.
> 
> On Mon, 19 Mar 2018 at 11:46 PM, Guozhang Wang  wrote:
> 
>> Hi Debasish,
>>
>> You can work on the PR in parallel to the KIP discussion so that people can
>> start reviewing it. The only restriction is that the PR cannot be merged
>> until the KIP is accepted.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 19, 2018 at 11:09 AM, Debasish Ghosh <
>> debasish.gh...@lightbend.com> wrote:
>>
>>> Based on this discussion I have a question ..
>>>
>>> For the actual implementation we need to have a PR on
>>> https://github.com/apache/kafka and the new library will be in the
>> package
>>> structure org.apache.kafka.streams.scala. My question is, is this PR part
>>> of the KIP ? Or we work on the PR once the KIP is accepted ..
>>>
>>> regards.
>>>
>>> On Mon, Mar 19, 2018 at 11:28 PM, Debasish Ghosh <
>>> debasish.gh...@lightbend.com> wrote:
>>>
 Thanks Guozhang for the comments .. we will start working on them ..

 regards.

 On Mon, Mar 19, 2018 at 11:02 PM, Guozhang Wang 
 wrote:

> And one more comment about the type safety:
>
> 7. I'm in favor of the approach of enforcing type safety at compile
>>> time,
> since with Scala's strong typing system I think it makes more sense to
>>> get
> rid of config-based serde specifications that can cause runtime error
>> in
> the Scala wrapper.
>
>
> Guozhang
>
>
> On Mon, Mar 19, 2018 at 10:28 AM, Guozhang Wang 
> wrote:
>
>> Hello Debasish,
>>
>> Thanks for the KIP. Here are some comments:
>>
>> 1. About the naming: I'd also vote for option 2 and enforce users to
>> rename when import since this scenario seems rare to me.
>>
>> 2. About the dependency: since this KIP can only be merged as early
>> as
>> 1.2, it means that for users who wants to use this artifact, say for
>> version 1.2, she would need to bring in "kafka-streams" version 1.2.
>>> In
>> addition, if we change the Java API in the future versions we'd also
> need
>> to update the Scala wrapper at the same version as well, so in other
> words
>> "kafka-streams-scala" version X have to be with "kafka-streams"
>>> version
> X
>> anyways. So I'd suggest we remove the version number in
> "kafka-streams-scala
>> only depends on the Scala standard library and Kafka Streams" as it
>>> may
>> be confusing to readers.
>>
>> 3. For the KIP discussion, it need be based on the proposed state of
> this
>> project in AK. More specifically, for the groupId in maven / gradle,
>>> it
>> need to be "org.apache.kafka"; for the version, it need to be Kafka
> release
>> versions, e.g. 1.2.0.
>>
>> 4. In "New or Changed Public Interfaces" section, it does not
>> mention
> the
>> serde classes like "DefaultSerdes", but I think they should also be
>> considered public APIs as users would most likely import these in
>> her
>> implementation.
>>
>> 5. Could you also list the changes you'd propose to made to
>>> build.gradle
>> in kafka for adding this artifact? More details will help readers to
> better
>> understand your proposal.
>>
>> 6. I think it'd will be good to have a WordCount example code as
>> part
>>> of
>> this KIP, to illustrate how to code in this Scala wrapper, e.g. in
>> o.a.k.scala.examples. But for this class we probably do not need to
> have a
>> separate artifact for it as we did in kafka-streams-examples.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 19, 2018 at 9:16 AM, Ted Yu 
>> wrote:
>>
>>> I agree with Sean on name unification.
>>>
>>> +1 to option 2.
>>>
>>> On Mon, Mar 19, 2018 at 7:23 AM, Sean Glover <
> sean.glo...@lightbend.com>
>>> wrote:
>>>
 Type names: I vote for option 2.  The user must explicitly add a
>>> dependency
 to this library and the wrapper types are in a different package.
>>> It
>>> seems
 reasonable to expect them to do an import rename if there's a
>> need
>>> to
>>> drop
 down to the Java API.

 Test Utils: The test utils in kafka-streams-scala are nice and
>>> lean,
> but
 I'm not sure if it provides much more value than other options
>> that
>>> exist
 in the community.  There's an embedded Kafka/ZK project
> implementation
>>> for
 

Re: [DISCUSS] KIP-272: Add API version tag to broker's RequestsPerSec metric

2018-03-19 Thread Ted Yu
bq. *additional hash lookup is needed when updating the metric to locate
the metric *

*Do you have estimate how much memory is needed for maintaining the hash
map ?*

*Thanks*

On Mon, Mar 19, 2018 at 3:19 PM, Allen Wang  wrote:

> Hi all,
>
> I have created KIP-272: Add API version tag to broker's RequestsPerSec
> metric.
>
> Here is the link to the KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric
>
> Looking forward to the discussion.
>
> Thanks,
> Allen
>


Re: Subject: [VOTE] 1.1.0 RC3

2018-03-19 Thread Ismael Juma
Vahid,

The Java 9 Connect issue is similar to the one being fixed for Trogdor in
the following PR:

https://github.com/apache/kafka/pull/4725

We need to do something similar for Connect.

Ismael

On Fri, Mar 16, 2018 at 3:10 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Damian,
>
> Thanks for running the release.
>
> I tried building from source and running the quick start on Linux &
> Windows with both Java 8 & 9.
> Here's the result:
>
> +-+-+-+
> | |  Linux  | Windows |
> + +-+-+
> | | J8 | J9 | J8 | J9 |
> +-+++++
> |  Build  |  + |  + |  + |  + |
> +-+++++
> |  Single broker  |  + |  + |  + |  + |
> | produce/consume |||||
> +-+++++
> | Connect |  + |  ? |  - |  - |
> +-+++++
> | Streams |  + |  + |  + |  + |
> +-+++++
>
> ?: Connect quickstart on Linux with Java 9 runs but the connect tool
> throws a bunch of exceptions (https://www.codepile.net/pile/yVg8XJB8)
> -: Connect quickstart on Windows fails (Java 8:
> https://www.codepile.net/pile/xJGra6BP, Java 9:
> https://www.codepile.net/pile/oREYeORK)
>
> Given that Windows is not an officially supported platform, and the
> exceptions with Linux/Java 9 are not breaking the functionality, my vote
> is a +1 (non-binding).
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Damian Guy 
> To: dev@kafka.apache.org, us...@kafka.apache.org,
> kafka-clie...@googlegroups.com
> Date:   03/15/2018 07:55 AM
> Subject:Subject: [VOTE] 1.1.0 RC3
>
>
>
> Hello Kafka users, developers and client-developers,
>
> This is the fourth candidate for release of Apache Kafka 1.1.0.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_pages_viewpage.action-3FpageId-
> 3D75957546=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2s
> gX46mjZPws01U=cKgJtQXXRauZ3HSAoSbsC9SLVTAkO-pbLdPrOCBuJzE=
>
>
> A few highlights:
>
> * Significant Controller improvements (much faster and session expiration
> edge cases fixed)
> * Data balancing across log directories (JBOD)
> * More efficient replication when the number of partitions is large
> * Dynamic Broker Configs
> * Delegation tokens (KIP-48)
> * Kafka Streams API improvements (KIP-205 / 210 / 220 / 224 / 239)
>
>
> Release notes for the 1.1.0 release:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__home.
> apache.org_-7Edamianguy_kafka-2D1.1.0-2Drc3_RELEASE-5FNOTES.
> html=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2sgX46mjZPws01U=
> 26FgbzRhKImhxyEkB4KzDPG-l8W_Y99xU6LykOAgpFI=
>
>
> *** Please download, test and vote by Monday, March 19, 9am PDT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__kafka.
> apache.org_KEYS=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2s
> gX46mjZPws01U=xlnrfgxVFMRCKk8pTOhujyC-Um4ogtsxK6Xwks6mc3U=
>
>
> * Release artifacts to be voted upon (source and binary):
> https://urldefense.proofpoint.com/v2/url?u=http-3A__home.
> apache.org_-7Edamianguy_kafka-2D1.1.0-2Drc3_=DwIBaQ=jf_
> iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=
> Qn2GySTKcOV5MFr3WDl63BDv7pTd2sgX46mjZPws01U=
> ulHUeYnWIp28Gsn4VV1NK3FrGV4Jn5rUpuU6tvgekME=
>
>
> * Maven artifacts to be voted upon:
> https://urldefense.proofpoint.com/v2/url?u=https-3A__
> repository.apache.org_content_groups_staging_=DwIBaQ=jf_
> iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=
> Qn2GySTKcOV5MFr3WDl63BDv7pTd2sgX46mjZPws01U=G9o4hXVXF0bjL_
> a3Wocod9GUEfy9WBBgoGa2u6yFKQw=
>
>
> * Javadoc:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__home.
> apache.org_-7Edamianguy_kafka-2D1.1.0-2Drc3_javadoc_=
> DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2sgX46mjZPws01U=
> 2auaI4IIJhEORGYm1Kdpxt5TDHh0PzSvtK77lC3SJVY=
>
>
> * Tag to be voted upon (off 1.1 branch) is the 1.1.0 tag:
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> com_apache_kafka_tree_1.1.0-2Drc3=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2s
> gX46mjZPws01U=h7G8XPD8vAWl_gqySi2Iocag5NnP32IT_PyirPC3Lss=
>
>
>
> * Documentation:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__kafka.
> apache.org_11_documentation.html=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=Qn2GySTKcOV5MFr3WDl63BDv7pTd2s
> gX46mjZPws01U=KcPsL867-tPQxKPC4ufl5tqg9RSL3lxwsgKhOxeA7t0=
>
> <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__kafka.
> apache.org_1_documentation.html=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Matthias J. Sax
Guozhang,

thanks for your comments.

2: I think my main concern is, that 1.2 would be "special" release that
everybody need to use to upgrade. As an alternative, we could say that
we add the config in 1.2 and keep it for 2 additional releases (1.3 and
1.4) but remove it in 1.5. This gives users more flexibility and does
force not force user to upgrade to a specific version but also allows us
to not carry the tech debt forever. WDYT about this? If users upgrade on
an regular basis, this approach could avoid a forces update with high
probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
point. Thus, only if users don't upgrade for a very long time, they are
forces to do 2 upgrades with an intermediate version.

4. Updated the KIP to remove the ".x" suffix

5. Updated the KIP accordingly.

-Matthias

On 3/19/18 10:33 AM, Guozhang Wang wrote:
> Yup :)
> 
> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu  wrote:
> 
>> bq. some snippet like ProduceRequest / ProduceRequest
>>
>> Did you mean ProduceRequest / Response ?
>>
>> Cheers
>>
>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:
>>
>>> Hi Matthias,
>>>
>>> About 2: yeah I guess this is a subjective preference. My main concern
>>> about keeping the config / handling code beyond 1.2 release is that it
>> will
>>> become a non-cleanable tech debt forever, as fewer and fewer users would
>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
>>> maintain this for nearly no one. On the other hand, I agree that this
>> tech
>>> debt is not too large. So if more people feel this is a good tradeoff to
>>> pay for not enforcing users from older versions to upgrade twice I'm
>> happen
>>> to change my opinion.
>>>
>>> A few more minor comments:
>>>
>>> 4. For the values of "upgrade.from", could we simply to only major.minor?
>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
>>> behavior in bug fix releases we would not need to specify a bug-fix
>> version
>>> to distinguish ever.
>>>
>>> 5. Could you also present the encoding format in subscription /
>> assignment
>>> metadata bytes in version 2, and in future versions (i.e. which first
>> bytes
>>> would be kept moving forward), for readers to better understand the
>>> proposal? some snippet like ProduceRequest / ProduceRequest in
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>>> would be very helpful.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
>>> wrote:
>>>
 Thanks for your comments.

 1. Because the old leader cannot decode the new Subscription it can
>> only
 send an empty assignment back. The idea to send empty assignments to
>> all
 members is interesting. I will try this out in an PR to see how it
>>> behaves.

 2. I don't see an issue with keeping config `upgrade.from` for future
 releases. Personally, I would prefer to not force users to do two
 upgrades if they want to go from pre-1.2 to post-1.2 version. Is there
>> a
 technical argument why you want to get rid of the config? What
 disadvantages do you see keeping `upgrade.from` beyond 1.2 release?

 Keeping the config is just a few lines of code in `StreamsConfig` as
 well we a single `if` statement in `StreamsPartitionAssignor` to force
>> a
 downgrade (cf
 https://github.com/apache/kafka/pull/4636/files#diff-
 392371c29384e33bb09ed342e7696c68R201)


 3. I updated the KIP accordingly.


 -Matthias

 On 3/15/18 3:19 PM, Guozhang Wang wrote:
> Hello Matthias, thanks for the KIP. Here are some comments:
>
> 1. "For all other instances the leader sends a regular Assignment in
> version X back." Does that mean the leader will exclude any member of
>>> the
> group whose protocol version that it does not understand? For
>> example,
>>> if
> we have A, B, C with A the leader, and B bounced with the newer
>>> version.
 In
> the first rebalance, A will only consider {A, C} for assignment while
> sending empty assignment to B. And then later when B downgrades will
>> it
> re-assign the tasks to it again? I felt this is unnecessarily
>>> increasing
> the num. rebalances and the total latency. Could the leader just
>> sends
> empty assignment to everyone, and since upon receiving the empty
 assignment
> each thread will not create / restore any tasks and will not clean up
>>> its
> local state (so that the prevCachedTasks are not lost in future
 rebalances)
> and re-joins immediately, if users choose to bounce an instance once
>> it
 is
> in RUNNING state the total time of rolling upgrades will be reduced.
>
> 2. If we want to allow upgrading from 1.1- versions to any of the
>>> future
> versions beyond 1.2, then we'd always need to keep the 

Jenkins build is back to normal : kafka-trunk-jdk7 #3274

2018-03-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-03-19 Thread Ismael Juma
Clarificarion, it's a by name parameter so nothing happens unless the first
parameter evaluates to false.

Ismael

On Mon, 19 Mar 2018, 19:21 Jay Kreps,  wrote:

> Optimizing startup seems really valuable but I'm a little confused by this.
>
> There are two different things:
> 1. Recovery
> 2. Sanity check
>
> The terminology we're using is a bit mixed here.
>
> Recovery means checksumming the log segments and rebuilding the index on a
> hard crash. This only happens on unflushed segments, which is generally
> just the last segment. Recovery is essential for the correctness guarantees
> of the log and you shouldn't disable it. It only happens on hard crash and
> is not a factor in graceful restart. We can likely optimize it but that
> would make most sense to do in a data driven fashion off some profiling.
>
> However there is also a ton of disk activity that happens during
> initialization (lots of checks on the file size, absolute path, etc). I
> think these have crept in over time with people not really realizing this
> code is perf sensitive and java hiding a lot of what is and isn't a file
> operation. One part of this is the sanityCheck() call for the two indexes.
> I don't think this call reads the full index, just the last entry in the
> index, right?. There should be no need to read the full index except during
> recovery (and then only for the segments being recovered). I think it would
> make a ton of sense to optimize this but I don't think that optimization
> needs to be configurable as this is just a helpful sanity check to detect
> common non-sensical things in the index files, but it isn't part of the
> core guarantees, in general you aren't supposed to lose committed data from
> disk, and if you do we may be able to fail faster but we fundamentally
> can't really help you. Again I think this would make the most sense to do
> in a data driven way, if you look at that code I think it is doing crazy
> amounts of file operations (e.g. getAbsolutePath, file sizes, etc). I think
> it'd make most sense to profile startup with a cold cash on a large log
> directory and do the same with an strace to see how many redundant system
> calls we do per segment and what is costing us and then cut some of this
> out. I suspect we could speed up our startup time quite a lot if we did
> that.
>
> For example we have a bunch of calls like this:
>
> require(len % entrySize == 0,
>
> "Index file " + file.getAbsolutePath + " is corrupt, found " +
> len +
>
> " bytes which is not positive or not a multiple of 8.")
> I'm pretty such file.getAbsolutePath is a system call and I assume that
> happens whether or not you fail the in-memory check?
>
> -Jay
>
>
> On Sun, Feb 25, 2018 at 10:27 PM, Dong Lin  wrote:
>
> > Hi all,
> >
> > I have created KIP-263: Allow broker to skip sanity check of inactive
> > segments on broker startup. See
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 263%3A+Allow+broker+to+skip+sanity+check+of+inactive+
> > segments+on+broker+startup
> > .
> >
> > This KIP provides a way to significantly reduce time to rolling bounce a
> > Kafka cluster.
> >
> > Comments are welcome!
> >
> > Thanks,
> > Dong
> >
>


Re: [DISCUSS] KIP-242: Mask password fields in Kafka Connect REST response

2018-03-19 Thread Ewen Cheslack-Postava
SSL authentication was added in KIP-208, which will be included in Kafka
1.1.0:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface

Connect isn't much different from the core Kafka/client configs currently
where in some security setups you need to pass in passwords directly, and
since there's various dynamic broker config improvements in the works, the
fact that Connect exposes these in a REST API doesn't make it any
different. I think the real long term solution to this is to add pluggable
password support where you could, e.g., get these values out of a separate
secrets management system instead of specifying them directly. Masking
passwords as described in this solution feels like it's more of a temporary
workaround and in order to be able to edit and update these connector
configs by working with the REST API, we'd have to address these issues
anyway.

-Ewen

On Mon, Mar 19, 2018 at 2:33 PM, Matt Farmer  wrote:

> What’s the status of this? This is a pretty hard blocker for us to meet
> requirements internally to deploy connect in a distributed fashion.
>
> @Ewen - Regarding the concern of accessing information securely - has
> there been any consideration of adding authentication to the connect api?
>
> > On Jan 17, 2018, at 3:55 PM, Randall Hauch  wrote:
> >
> > Vincent,
> >
> > Can the KIP more explicitly say that this is opt-in, and that by default
> > nothing will change?
> >
> > Randall
> >
> > On Tue, Jan 16, 2018 at 11:18 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> >> Vincent,
> >>
> >> I think with the addition of a configuration to control this for
> >> compatibility, people would generally be ok with it. If you want to
> start a
> >> VOTE thread, the KIP deadline is coming up and the PR looks pretty
> small. I
> >> will take a pass at reviewing the PR so we'll be ready to merge if we
> can
> >> get the KIP voted through.
> >>
> >> Thanks,
> >> Ewen
> >>
> >> On Fri, Jan 12, 2018 at 10:18 AM, Vincent Meng  wrote:
> >>
> >>> @Ted: The issue is kinda hard to reproduce. It's just something we
> >> observe
> >>> over time.
> >>>
> >>> @Ewen: I agree. Opt-in seems to be a good solution to me. To your
> >> question,
> >>> if there is no ConfDef that defines which fields are Passwords we can
> >> just
> >>> return the config as is.
> >>>
> >>> There is a PR for this KIP already. Comments/Discussions are welcome.
> >>> https://github.com/apache/kafka/pull/4269
> >>>
> >>> On Tue, Jan 2, 2018 at 8:52 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> >>>
> >>> wrote:
> >>>
>  Vincent,
> 
>  Thanks for the KIP. This is definitely an issue we know is a problem
> >> for
>  some users.
> 
>  I think the major problem with the KIP as-is is that it makes it
> >>> impossible
>  to get the original value back out of the API. This KIP probably ties
> >> in
>  significantly with ideas for securing the REST API (SSL) and adding
> >> ACLs
> >>> to
>  it. Both are things we know people want, but haven't happened yet.
> >>> However,
>  it also interacts with other approaches to adding those features, e.g.
>  layering proxies on top of the existing API (e.g. nginx, apache, etc).
> >>> Just
>  doing a blanket replacement of password values with a constant would
> >>> likely
>  break things for people who secure things via a proxy (and may just
> not
>  allow reads of configs unless the user is authorized for the
> particular
>  connector). These are the types of concerns we like to think through
> in
> >>> the
>  compatibility section. One option to get the masking functionality in
>  without depending on a bunch of other security improvements might be
> to
>  make this configurable so users that need this (and can forgo seeing a
>  valid config via the API) can opt-in.
> 
>  Regarding your individual points:
> 
>  * I don't think the particular value for the masked content matters
> >> much.
>  Any constant indicating a password field is good. Your value seems
> fine
> >>> to
>  me.
>  * I don't think ConnectorInfo has enough info on its own to do proper
>  masking. In fact, I think you need to parse the config enough to get
> >> the
>  Connector-specific ConfigDef out in order to determine which fields
> are
>  Password fields. I would probably try to push this to be as central as
>  possible, maybe adding a method to AbstractHerder that can get configs
> >>> with
>  a boolean indicating whether they need to have sensitive fields
> >> removed.
>  That method could deal with parsing the config to get the right
> >>> connector,
>  getting the connector config, and then sanitizing any configs that are
>  sensitive. We could have this in one location, then have the relevant
> >>> REST
>  APIs just use the right flag to determine if they get sanitized or
> 

[jira] [Created] (KAFKA-6688) The Trogdor coordinator should track task statuses

2018-03-19 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-6688:
--

 Summary: The Trogdor coordinator should track task statuses
 Key: KAFKA-6688
 URL: https://issues.apache.org/jira/browse/KAFKA-6688
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


The Trogdor coordinator should track task statuses



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Richard Yu
Hi Ismael,

You have a great point. Since most of the methods in this KIP have similar
callbacks (position() and committed() both use fetchCommittedOffsets(), and
commitSync() is similar to position(), except just updating offsets), the
amount of time
they block should be also about equal.

However, I think that we need to take into account a couple of things. For
starters,
if the new methods were all reliant on one config, there is likelihood that
the
shortcomings for this approach would be similar to what we faced if we let
request.timeout.ms control all method timeouts. In comparison, adding
overloads
does not have this problem.

If you have further thoughts, please let me know.

Richard


On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma  wrote:

> Hi,
>
> An option that is not currently covered in the KIP is to have a separate
> config max.block.ms, which is similar to the producer config with the same
> name. This came up during the KAFKA-2391 discussion. I think it's clear
> that we can't rely on request.timeout.ms, so the decision is between
> adding
> overloads or adding a new config. People seemed to be leaning towards the
> latter in KAFKA-2391, but Jason makes a good point that the overloads are
> more flexible. A couple of questions from me:
>
> 1. Do we need the additional flexibility?
> 2. If we do, do we need it for every blocking method?
>
> Ismael
>
> On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu 
> wrote:
>
> > Hi Guozhang,
> >
> > I made some clarifications to KIP-266, namely:
> > 1. Stated more specifically that commitSync will accept user input.
> > 2. fetchCommittedOffsets(): Made its role in blocking more clear to the
> > reader.
> > 3. Sketched what would happen when time limit is exceeded.
> >
> > These changes should make the KIP easier to understand.
> >
> > Cheers,
> > Richard
> >
> > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang 
> wrote:
> >
> > > Hi Richard,
> > >
> > > I made a pass over the KIP again, some more clarifications / comments:
> > >
> > > 1. seek() call itself is not blocking, only the following poll() call
> may
> > > be blocking as the actually metadata rq will happen.
> > >
> > > 2. I saw you did not include Consumer.partitionFor(),
> > > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP.
> > After
> > > a second thought, I think this may be a better idea to not tackle them
> in
> > > the same KIP, and probably we should consider whether we would change
> the
> > > behavior or not in another discussion. So I agree to not include them.
> > >
> > > 3. In your wiki you mentioned "Another change shall be made to
> > > KafkaConsumer#poll(), due to its call to updateFetchPositions() which
> > > blocks indefinitely." This part may a bit obscure to most readers who's
> > not
> > > familiar with the KafkaConsumer internals, could you please add more
> > > elaborations. More specifically, I think the root causes of the public
> > APIs
> > > mentioned are a bit different while the KIP's explanation sounds like
> > they
> > > are due to the same reason:
> > >
> > > 3.1 fetchCommittedOffsets(): this internal call will block forever if
> the
> > > committed offsets cannot be fetched successfully and affect position()
> > and
> > > committed(). We need to break out of its internal while loop.
> > > 3.2 position() itself will while loop when offsets cannot be retrieved
> in
> > > the underlying async call. We need to break out this while loop.
> > > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should
> > take
> > > the user specified timeouts when applicable.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu <
> yohan.richard...@gmail.com>
> > > wrote:
> > >
> > > > Actually, what I said above is inaccurate. In
> > > > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks,
> > not
> > > > seek.
> > > > My assumption is that seek did not update correctly. I will be
> digging
> > > > further into this.
> > > >
> > > >
> > > >
> > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu <
> > yohan.richard...@gmail.com>
> > > > wrote:
> > > >
> > > > > One more thing: when looking through tests, I have realized that
> > seek()
> > > > > methods can potentially block indefinitely. As you well know,
> seek()
> > is
> > > > > called when pollOnce() or position() is active. Thus, if position()
> > > > blocks
> > > > > indefinitely, then so would seek(). Should bounding seek() also be
> > > > included
> > > > > in this KIP?
> > > > >
> > > > > Thanks, Richard
> > > > >
> > > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu <
> > > yohan.richard...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Thanks for the advice, Jason
> > > > >>
> > > > >> I have modified KIP-266 to include the java doc for committed()
> and
> > > > other
> > > > >> blocking methods, and I also
> > > > >> mentioned poll() which will also be bounded. Let me know if there

Build failed in Jenkins: kafka-trunk-jdk8 #2492

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: KafkaFutureImpl#addWaiter should be protected (#4734)

[github] MINOR: Pass a streams config to replace the single state dir (#4714)

[rajinisivaram]  KAFKA-6680: Fix issues related to Dynamic Broker configs 
(#4731)

--
[...truncated 420.14 KB...]
kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASSED

kafka.network.SocketServerTest > testIdleConnection STARTED

kafka.network.SocketServerTest > testIdleConnection PASSED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed STARTED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed PASSED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown STARTED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown PASSED

kafka.network.SocketServerTest > testSessionPrincipal STARTED

kafka.network.SocketServerTest > testSessionPrincipal PASSED

kafka.network.SocketServerTest > configureNewConnectionException STARTED

kafka.network.SocketServerTest > configureNewConnectionException PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides PASSED

kafka.network.SocketServerTest > processNewResponseException STARTED

kafka.network.SocketServerTest > processNewResponseException PASSED

kafka.network.SocketServerTest > processCompletedSendException STARTED

kafka.network.SocketServerTest > processCompletedSendException PASSED

kafka.network.SocketServerTest > processDisconnectedException STARTED

kafka.network.SocketServerTest > processDisconnectedException PASSED

kafka.network.SocketServerTest > sendCancelledKeyException STARTED

kafka.network.SocketServerTest > sendCancelledKeyException PASSED

kafka.network.SocketServerTest > processCompletedReceiveException STARTED

kafka.network.SocketServerTest > processCompletedReceiveException PASSED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown STARTED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown PASSED

kafka.network.SocketServerTest > pollException STARTED

kafka.network.SocketServerTest > pollException PASSED

kafka.network.SocketServerTest > testSslSocketServer STARTED

kafka.network.SocketServerTest > testSslSocketServer PASSED

kafka.network.SocketServerTest > tooBigRequestIsRejected STARTED

kafka.network.SocketServerTest > tooBigRequestIsRejected PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete STARTED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
STARTED
ERROR: Could not install GRADLE_4_4_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:458)
 

Re: Seeking Feedback on Kafka Connect Issues

2018-03-19 Thread Ewen Cheslack-Postava
Responses inline.

On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer  wrote:

> Hi everyone,
>
> We’ve been experimenting recently with some limited use of Kafka Connect
> and are hoping to expand to wider use cases soon. However, we had some
> internal issues that gave us a well-timed preview of error handling
> behavior in Kafka Connect. I think the fixes for this will require at least
> three different KIPs, but I want to share some thoughts to get the initial
> reaction from folks in the dev community. If these ideas seem reasonable, I
> can go ahead and create the required KIPs.
>
> Here are the three things specifically we ran into…
>
> ---
>
> (1) Kafka Connect only retries tasks when certain exceptions are thrown
> Currently, Kafka Connect only retries tasks when certain exceptions are
> thrown - I believe the logic checks to see if the exception is specifically
> marked as “retryable” and if not, fails. We’d like to bypass this behavior
> and implement a configurable exponential backoff for tasks regardless of
> the failure reason. This is probably two changes: one to implement
> exponential backoff retries for tasks if they don’t already exist and a
> chance to implement a RetryPolicy interface that evaluates the Exception to
> determine whether or not to retry.
>

This has definitely come up before. The likely "fix" for this is to provide
general "bad data handling" options within the framework itself. The
obvious set would be

1. fail fast, which is what we do today (assuming connector actually fails
and doesn't eat errors)
2. retry (possibly with configs to limit)
3. drop data and move on
4. dead letter queue

This needs to be addressed in a way that handles errors from:

1. The connector itself (e.g. connectivity issues to the other system)
2. Converters/serializers (bad data, unexpected format, etc)
3. SMTs
4. Ideally the fmwk as well (though I don't think we have any known bugs
where this would be a problem, and we'd be inclined to just fix them
anyway).

I think we understand the space of problems and how to address them pretty
well already, this issue is really just a matter of someone finding the
time to KIP, implement, and review/implement. (And that review/commit one
realistically means we need multiple people's time). Happy to guide anyone
interested on next steps. If not addressed by general community, Confluent
will get to this at some point, but I couldn't say when that would be --
Randall might know better than I would.


> (2) Kafka Connect doesn’t permit Connectors to smartly reposition after
> rebalance
> We’re using the S3 connector to dump files with a large number of records
> into an S3 bucket. About 100,000 records per file. Unfortunately, every
> time a task fails, the consumer rebalance causes all partitions to get
> re-shuffled amongst the various partitions. To compensate for this, the
> connector gets stopped and started from what I can tell from the logs? And
> then picks up from the last consumer position that was committed to the
> brokers.
>
> This doesn’t work great if you’re batching things into large numbers for
> archival.
>
> For the S3 connector, for example: Let’s say I have two partitions and the
> connector has two tasks to process each of those. Task 0 is at 5,000
> records read from the last commit and Task 1 is at 70,000 records read from
> the last commit. Then, boom, something goes wrong with Task 0 and it falls
> over. This triggers a rebalance and Task 1 has to take over the workload.
> Task 1 will, at this point, discard the 70,000 records in its buffer and
> start from the last commit point. This failure mode is brutal for the
> archival system we’re building.
>
>
Yes, this is a known pain point. Usually it shows up as more of an issue
for running a lot of connectors (where you don't want a tasks failure to
unnecessarily affect unrelated work), but the concern for connectors which
do relatively infrequent commits is valid as well. I'll make a point on the
first solution then see below for more complete answer.


> There are two solutions that I can think of to this:
>
> (A) Provide an interface for connectors to define their own rebalance
> listener. This listener could compare the newly assigned list of partitions
> with a previously assigned list. For all partitions that this connector was
> already working on prior to the rebalance, it could manually seek to the
> last position it locally processed before resuming. So, in the scenario
> above Task 1 could keep an accounting file locally and seek over the first
> 70,000 records without reprocessing them. It would then wait until after it
> confirms the S3 upload to commit those offsets back to Kafka. This ensures
> that if the machine running Task 1 dies a new consumer can take its place,
> but we’ll still benefit from a local cache if one is present.
>

For sink tasks, this actually already exists -- see

Build failed in Jenkins: kafka-1.1-jdk7 #92

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: KafkaFutureImpl#addWaiter should be protected (#4734)

[rajinisivaram]  KAFKA-6680: Fix issues related to Dynamic Broker configs 
(#4731)

--
[...truncated 415.49 KB...]

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2 PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearEarliestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldPreserveResetOffsetOnClearEarliestIfOneExists PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldFetchEndOffsetOfEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearAllEntries PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > shouldClearLatestOnEmptyCache 
PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotResetEpochHistoryHeadIfUndefinedPassed PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldSurviveFastLeaderChange PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow PASSED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldNotAllowDivergentLogs STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
shouldNotAllowDivergentLogs PASSED

kafka.server.MultipleListenersWithAdditionalJaasContextTest > 
testProduceConsume STARTED

kafka.server.MultipleListenersWithAdditionalJaasContextTest > 
testProduceConsume PASSED

kafka.server.OffsetCommitTest > testUpdateOffsets STARTED

kafka.server.OffsetCommitTest > testUpdateOffsets PASSED

kafka.server.OffsetCommitTest > testLargeMetadataPayload STARTED

kafka.server.OffsetCommitTest > testLargeMetadataPayload PASSED

kafka.server.OffsetCommitTest > testOffsetsDeleteAfterTopicDeletion STARTED

kafka.server.OffsetCommitTest > testOffsetsDeleteAfterTopicDeletion PASSED

kafka.server.OffsetCommitTest > testCommitAndFetchOffsets STARTED

kafka.server.OffsetCommitTest > 

Build failed in Jenkins: kafka-trunk-jdk7 #3270

2018-03-19 Thread Apache Jenkins Server
See 


Changes:

[mjsax] KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)

--
[...truncated 1.53 MB...]
org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreState PASSED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled STARTED

org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning PASSED
ERROR: Could not install GRADLE_3_5_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:458)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:666)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:631)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:391)
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1384)
at hudson.model.AbstractProject.poll(AbstractProject.java:1287)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated 

Re: [DISCUSS] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-03-19 Thread Panuwat Anawatmongkhon
Hi Matthias,
I am not sure weather I understand the impact correctly. With the old api
user have to specify name for state store and the name is used for auto
generated topics for fault tolerance, so when replace the old api usage
with the new one we don’t have any kind of auto renaming so the backup
topic name will be inconsistent.
Cheers,
Benz

On Mon, 19 Mar 2561 at 10:59 Panuwat Anawatmongkhon <
panuwat.anawatmongk...@gmail.com> wrote:

> I couldn’t find this thread in mailing list. this reply is just to trigger
> it up so I can include it in KIP
>
> On Thu, 28 Dec 2560 at 03:07 Matthias J. Sax 
> wrote:
>
>> @Matthias: just wanted to follow up on your question:
>>
>>  I wanted to double check. If I understand the proposal, it would
>> replace
>>  the explicit name with a name that is dynamically generated using the
>>  AtomicInteger index. Would this affect the naming of any internally
>>  generated topics?
>>
>> I think it would -- note, that the old API will not be removed but
>> deprecated -- thus, you can still update without any issues staying with
>> the old API -- only if you start to use the new API, it could impact an
>> application.
>>
>> It should still be possible to upgrade to the new API if you invest the
>> time to rename the corresponding topics correctly -- this will only work
>> if you use a new application id or take your application offline though.
>>
>> -Matthias
>>
>>
>> On 12/16/17 10:17 AM, Panuwat Anawatmongkhon wrote:
>> > Hi all,
>> > I would like to start the vote thread tomorrow, feel free to ask if
>> there
>> > is any concern.
>> > Thank you
>> >
>> > On Thu, 7 Dec 2560 at 19:22 Panuwat Anawatmongkhon <
>> > panuwat.anawatmongk...@gmail.com> wrote:
>> >
>> >>
>> >> Yes, Matthias.
>> >> The object will be used togerther with function table and function
>> stream.
>> >> I didn’t see how this will affect other part but if you do, please
>> explain
>> >> more on how this will affect generated topic name.
>> >> Thank you
>> >> Panuwat
>> >>
>> >>
>> >> On Thu, 7 Dec 2560 at 00:01 Matthias Margush <
>> matthias.marg...@gmail.com>
>> >> wrote:
>> >>
>> >>> Hi.
>> >>>
>> >>> I wanted to double check. If I understand the proposal, it would
>> replace
>> >>> the explicit name with a name that is dynamically generated using the
>> >>> AtomicInteger index. Would this affect the naming of any internally
>> >>> generated topics?
>> >>>
>> >>> On Wed, Dec 6, 2017 at 7:59 AM Panuwat Anawatmongkhon <
>> >>> panuwat.anawatmongk...@gmail.com> wrote:
>> >>>
>>  Thanks Bill.
>> 
>>  I can't think of reason to keep the old method too so if there is no
>>  further discussion by tomorrow, I would like to start the vote
>> thread.
>> 
>>  On Tue, Dec 5, 2017 at 10:38 PM, Bill Bejeck 
>> wrote:
>> 
>> > Hi Panuwat,
>> >
>> > Thanks for the KIP, overall looks good to me.
>> >
>> > I want to play the devil's advocate for a second and ask do we want
>> to
>>  keep
>> > the older method with the extra parameters vs. deprecation?
>> >
>> > Although ATM I can't think of a good reason to keep the old method
>> >>> with
>>  the
>> > extra parameters.
>> >
>> > Thanks,
>> > Bill
>> >
>> > On Tue, Dec 5, 2017 at 5:48 AM, Ted Yu  wrote:
>> >
>> >> Fine by me.
>> >>
>> >> On Tue, Dec 5, 2017 at 2:45 AM, Panuwat Anawatmongkhon <
>> >> panuwat.anawatmongk...@gmail.com> wrote:
>> >>
>> >>> Thank you, Matthias.
>> >>>
>> >>> Ted,
>> >>> How about this.
>> >>>
>> >>> String globalTopicName = "testGlobalTopic";
>> >>> String globalStoreName = "testAddGlobalStore";
>> >>> final StreamsBuilder builder = new StreamsBuilder();
>> >>> final KeyValueStoreBuilder globalStoreBuilder =
>> >>> EasyMock.createNiceMock(KeyValueStoreBuilder.class);
>> >>>
>> 
>> EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).
>> >>> anyTimes();
>> >>> EasyMock.replay(globalStoreBuilder);
>> >>> builder.addGlobalStore(globalStoreBuilder,globalTopicName,new
>> >>> ConsumedInternal(),new MockProcessorSupplier());
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Dec 5, 2017 at 4:58 AM, Matthias J. Sax <
>>  matth...@confluent.io
>> >>
>> >>> wrote:
>> >>>
>>  Panuwat,
>> 
>>  Thanks a lot for the KIP!
>> 
>>  Just one nit: `does not follow provide a good` -> spelling:
>> >>> remove
>>  `follow` ?
>> 
>>  Otherwise, looks good to me.
>> 
>> 
>>  -Matthias
>> 
>> 
>> 
>>  On 12/4/17 10:49 AM, Ted Yu wrote:
>> > Looks like you're implying logic similar to this:
>> >
>> > public synchronized  GlobalKTable
>>  globalTable(final
>>  String
>> > topic,
>> >
>> 

Jenkins build is back to normal : kafka-trunk-jdk9 #491

2018-03-19 Thread Apache Jenkins Server
See