[jira] [Created] (KAFKA-7274) Incorrect subject credential used in inter-broker communication

2018-08-09 Thread TAO XIAO (JIRA)
TAO XIAO created KAFKA-7274:
---

 Summary: Incorrect subject credential used in inter-broker 
communication
 Key: KAFKA-7274
 URL: https://issues.apache.org/jira/browse/KAFKA-7274
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.2, 1.0.1, 1.0.0
Reporter: TAO XIAO


We configured one broker setup to enable multiple SASL mechanisms using JAAS 
config file but we failed to start up the broker.

 

Here is security section of server.properties

 

{{listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=PLAIN}}{{}}

 

JAAS file

 
{noformat}
sasl_plaintext.KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin1"
  password="admin-secret";
};{noformat}
 

Exception we got

 
{noformat}
[2018-08-10 12:12:13,070] ERROR [Controller id=0, targetBrokerId=0] Connection 
to node 0 failed authentication due to: Authentication failed: Invalid username 
or password (org.apache.kafka.clients.NetworkClient){noformat}
 

If we changed to use broker configuration property we can start broker 
successfully

 
{noformat}
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=PLAIN
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required username="admin" password="admin-secret" user_admin="admin-secret" 
user_alice="alice-secret";
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
 required username="admin1" password="admin-secret";{noformat}
 

I believe this issue is caused by Kafka assigning all login modules to each 
defined mechanism when using JAAS file which results in Login class to add both 
username defined in each login module to the same subject

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java#L101]

 

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java#L63]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2883

2018-08-09 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-6751; Support dynamic configuration of

--
[...truncated 2.48 MB...]

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldRestoreStoreWithSinglePutRestoreSpecification PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldNotChangeOffsetsIfAckedOffsetsIsNull PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCloseStateManagerWithOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeProcessorTopology PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeContext PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldCheckpointOffsetsWhenStateIsFlushed PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenKeyDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldInitializeStateManager PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldProcessRecordsForOtherTopic PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldThrowStreamsExceptionWhenValueDeserializationFails PASSED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler STARTED

org.apache.kafka.streams.processor.internals.GlobalStateTaskTest > 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testThroughputMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testLatencyMetrics PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveSensor PASSED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 
testRemoveNullSensor STARTED

org.apache.kafka.streams.processor.internals.StreamsMetricsImplTest > 

Re: [VOTE] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-09 Thread Jun Rao
Hi, Jason,

Thanks for the KIP. +1 from me.

Jun

On Wed, Aug 8, 2018 at 1:04 PM, Jason Gustafson  wrote:

> Hi All,
>
> I'd like to start a vote for KIP-320:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation.
> Thanks to everyone who reviewed the proposal. Please feel free to send
> additional questions to the discussion thread if you have any.
>
> +1 from me (duh)
>
> Thanks,
> Jason
>


[DISCUSS] KIP-349 Priorities for Source Topics

2018-08-09 Thread nick

Since there are questions I changed the heading from VOTE to DISCUSS

> On Aug 8, 2018, at 9:09 PM, Matt Farmer  wrote:
> 
> s it worth spelling out explicitly what the behavior is when two topics
> have the same priority? I'm a bit fuzzy on how we choose what topics to
> consume from right now, if I'm being honest, so it could be useful to
> outline the current behavior in the background and to spell out how that
> would change (or if it would change) when two topics are given the same
> priority.

I added an additional note in the KIP’s Compatibility section to clarify that 
current behavior would not change in order to preserve backwards compatibility. 
 

> Also, how does this play with max.poll.records? Does the consumer read from
> all the topics in priority order until we've hit the number of records or
> the poll timeout? Or does it immediately return the high priority records
> without pulling low priority records?


My own interpretation would be to read from all the topics in priority order as 
the consumer is subscribed to multiple topics.  
--
  Nick

> 
> 
> On Wed, Aug 8, 2018 at 8:39 PM  > wrote:
> 
>> 
>> Hi All,
>> 
>> Calling for a vote on KIP-349
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics
>>  
>> 
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics
>>  
>> 
>> 






Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-08-09 Thread Jason Gustafson
Hey Stanislav,

Sorry, I was probably looking at an older version (I had the tab open for
so long!).

I have been thinking about `max.uncleanable.partitions` and wondering if
it's what we really want. The main risk if the cleaner cannot clean a
partition is eventually running out of disk space. This is the most common
problem we have seen with cleaner failures and it can happen even if there
is just one uncleanable partition. We've actually seen cases in which a
single __consumer_offsets grew large enough to fill a significant portion
of the disk. The difficulty with allowing a system to run out of disk space
before failing is that it makes recovery difficult and time consuming.
Clean shutdown, for example, requires writing some state to disk. Without
clean shutdown, it can take the broker significantly longer to startup
because it has do more segment recovery.

For this problem, `max.uncleanable.partitions` does not really help. You
can set it to 1 and fail fast, but that is not much better than the
existing state. You had a suggestion previously in the KIP to use the size
of uncleanable disk space instead. What was the reason for rejecting that?
Intuitively, it seems like a better fit for a cleaner failure. It would
provide users some time to react to failures while still protecting them
from exhausting the disk.

Thanks,
Jason




On Thu, Aug 9, 2018 at 9:46 AM, Stanislav Kozlovski 
wrote:

> Hey Jason,
>
> 1. *10* is the default value, it says so in the KIP
> 2. This is a good catch. As the current implementation stands, it's not a
> useful metric since the thread continues to run even if all log directories
> are offline (although I'm not sure what the broker's behavior is in that
> scenario). I'll make sure the thread stops if all log directories are
> online.
>
> I don't know which "Needs Discussion" item you're referencing, there hasn't
> been any in the KIP since August 1 and that was for the metric only. KIP
> History
>  pageId=89064875>
>
> I've updated the KIP to mention the "time-since-last-run" metric.
>
> Thanks,
> Stanislav
>
> On Wed, Aug 8, 2018 at 12:12 AM Jason Gustafson 
> wrote:
>
> > Hi Stanislav,
> >
> > Just a couple quick questions:
> >
> > 1. I may have missed it, but what will be the default value for
> > `max.uncleanable.partitions`?
> > 2. It seems there will be some impact for users that monitoring
> > "time-since-last-run-ms" in order to detect cleaner failures. Not sure
> it's
> > a major concern, but probably worth mentioning in the compatibility
> > section. Also, is this still a useful metric after this KIP?
> >
> > Also, maybe the "Needs Discussion" item can be moved to rejected
> > alternatives since you've moved to a vote? I think leaving this for
> > potential future work is reasonable.
> >
> > Thanks,
> > Jason
> >
> >
> > On Mon, Aug 6, 2018 at 12:29 PM, Ray Chiang  wrote:
> >
> > > I'm okay with that.
> > >
> > > -Ray
> > >
> > > On 8/6/18 10:59 AM, Colin McCabe wrote:
> > >
> > >> Perhaps we could start with max.uncleanable.partitions and then
> > implement
> > >> max.uncleanable.partitions.per.logdir in a follow-up change if it
> seemed
> > >> to be necessary?  What do you think?
> > >>
> > >> regards,
> > >> Colin
> > >>
> > >>
> > >> On Sat, Aug 4, 2018, at 10:53, Stanislav Kozlovski wrote:
> > >>
> > >>> Hey Ray,
> > >>>
> > >>> Thanks for the explanation. In regards to the configuration property
> -
> > >>> I'm
> > >>> not sure. As long as it has sufficient documentation, I find
> > >>> "max.uncleanable.partitions" to be okay. If we were to add the
> > >>> distinction
> > >>> explicitly, maybe it should be `max.uncleanable.partitions.
> per.logdir`
> > ?
> > >>>
> > >>> On Thu, Aug 2, 2018 at 7:32 PM Ray Chiang 
> wrote:
> > >>>
> > >>> One more thing occurred to me.  Should the configuration property be
> >  named "max.uncleanable.partitions.per.disk" instead?
> > 
> >  -Ray
> > 
> > 
> >  On 8/1/18 9:11 AM, Stanislav Kozlovski wrote:
> > 
> > > Yes, good catch. Thank you, James!
> > >
> > > Best,
> > > Stanislav
> > >
> > > On Wed, Aug 1, 2018 at 5:05 PM James Cheng 
> > > wrote:
> > >
> > > Can you update the KIP to say what the default is for
> > >> max.uncleanable.partitions?
> > >>
> > >> -James
> > >>
> > >> Sent from my iPhone
> > >>
> > >> On Jul 31, 2018, at 9:56 AM, Stanislav Kozlovski <
> > >>>
> > >> stanis...@confluent.io>
> > 
> > > wrote:
> > >>
> > >>> Hey group,
> > >>>
> > >>> I am planning on starting a voting thread tomorrow. Please do
> reply
> > >>> if
> > >>>
> > >> you
> > >>
> > >>> feel there is anything left to discuss.
> > >>>
> > >>> Best,
> > >>> Stanislav
> > >>>
> > >>> On Fri, Jul 27, 2018 at 11:05 PM Stanislav Kozlovski <
> > >>>
> > >> stanis...@confluent.io>
> > >>
> > >>> 

[DISCUSS] KIP-354 Time-based log compaction policy

2018-08-09 Thread xiongqi wu
Hi Kafka,

This KIP tries to address GDPR concern to fulfill deletion request on time
through time-based log compaction on a compaction enabled topic:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy

Any feedback will be appreciated.


Xiongqi (Wesley) Wu


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-09 Thread Lucas Wang
@Becket,

I've asked for review by Jun and Joel in the vote thread.
Regarding the separate thread and port, I did talk about it in the rejected
alternative design 1.
Please let me know if you'd like more elaboration or moving it to the
motivation, etc.

Thanks,
Lucas

On Wed, Aug 8, 2018 at 3:59 PM, Becket Qin  wrote:

> Hi Lucas,
>
> Yes, a separate Jira is OK.
>
> Since the proposal has significantly changed since the initial vote
> started. We probably should let the others who have already voted know and
> ensure they are happy with the updated proposal.
> Also, it seems the motivation part of the KIP wiki is still just talking
> about the separate queue and not fully cover the changes we make now, e.g.
> separate thread, port, etc. We might want to explain a bit more so for
> people who did not follow the discussion mail thread also understand the
> whole proposal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Aug 8, 2018 at 12:44 PM, Lucas Wang  wrote:
>
> > Hi Becket,
> >
> > Thanks for the review. The current write up in the KIP won’t change the
> > ordering behavior. Are you ok with addressing that as a separate
> > independent issue (I’ll create a separate ticket for it)?
> > If so, can you please give me a +1 on the vote thread?
> >
> > Thanks,
> > Lucas
> >
> > On Tue, Aug 7, 2018 at 7:34 PM Becket Qin  wrote:
> >
> > > Thanks for the updated KIP wiki, Lucas. Looks good to me overall.
> > >
> > > It might be an implementation detail, but do we still plan to use the
> > > correlation id to ensure the request processing order?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Jul 31, 2018 at 3:39 AM, Lucas Wang 
> > wrote:
> > >
> > > > Thanks for your review, Dong.
> > > > Ack that these configs will have a bigger impact for users.
> > > >
> > > > On the other hand, I would argue that the request queue becoming full
> > > > may or may not be a rare scenario.
> > > > How often the request queue gets full depends on the request incoming
> > > rate,
> > > > the request processing rate, and the size of the request queue.
> > > > When that happens, the dedicated endpoints design can better handle
> > > > it than any of the previously discussed options.
> > > >
> > > > Another reason I made the change was that I have the same taste
> > > > as Becket that it's a better separation of the control plane from the
> > > data
> > > > plane.
> > > >
> > > > Finally, I want to clarify that this change is NOT motivated by the
> > > > out-of-order
> > > > processing discussion. The latter problem is orthogonal to this KIP,
> > and
> > > it
> > > > can happen in any of the design options we discussed for this KIP so
> > far.
> > > > So I'd like to address out-of-order processing separately in another
> > > > thread,
> > > > and avoid mentioning it in this KIP.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > > On Fri, Jul 27, 2018 at 7:51 PM, Dong Lin 
> wrote:
> > > >
> > > > > Hey Lucas,
> > > > >
> > > > > Thanks for the update.
> > > > >
> > > > > The current KIP propose new broker configs
> "listeners.for.controller"
> > > and
> > > > > "advertised.listeners.for.controller". This is going to be a big
> > change
> > > > > since listeners are among the most important configs that every
> user
> > > > needs
> > > > > to change. According to the rejected alternative section, it seems
> > that
> > > > the
> > > > > reason to add these two configs is to improve performance when the
> > data
> > > > > request queue is full rather than for correctness. It should be a
> > very
> > > > rare
> > > > > scenario and I am not sure we should add configs for all users just
> > to
> > > > > improve the performance in such rare scenario.
> > > > >
> > > > > Also, if the new design is based on the issues which are discovered
> > in
> > > > the
> > > > > recent discussion, e.g. out of order processing if we don't use a
> > > > dedicated
> > > > > thread for controller request, it may be useful to explain the
> > problem
> > > in
> > > > > the motivation section.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Fri, Jul 27, 2018 at 1:28 PM, Lucas Wang  >
> > > > wrote:
> > > > >
> > > > > > A kind reminder for review of this KIP.
> > > > > >
> > > > > > Thank you very much!
> > > > > > Lucas
> > > > > >
> > > > > > On Wed, Jul 25, 2018 at 10:23 PM, Lucas Wang <
> > lucasatu...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I've updated the KIP by adding the dedicated endpoints for
> > > controller
> > > > > > > connections,
> > > > > > > and pinning threads for controller requests.
> > > > > > > Also I've updated the title of this KIP. Please take a look and
> > let
> > > > me
> > > > > > > know your feedback.
> > > > > > >
> > > > > > > Thanks a lot for your time!
> > > > > > > Lucas
> > > > > > >
> > > > > > > On Tue, Jul 24, 2018 at 10:19 AM, Mayuresh Gharat <
> > > > > > > gharatmayures...@gmail.com> wrote:
> > > > > > >
> > > > > > 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-08-09 Thread Lucas Wang
Hi Jun and Joel,

The KIP writeup has changed by quite a bit since your +1.
Can you please take another review? Thanks a lot for your time!

Lucas

On Tue, Jul 17, 2018 at 10:33 AM, Joel Koshy  wrote:

> +1 on the KIP.
>
> (I'm not sure we actually necessary to introduce the condition variables
> for the concern that Jun raised, but it's an implementation detail that we
> can defer to a discussion in the PR.)
>
> On Sat, Jul 14, 2018 at 10:45 PM, Lucas Wang 
> wrote:
>
> > Hi Jun,
> >
> > I agree by using the conditional variables, there is no need to add such
> a
> > new config.
> > Also thanks for approving this KIP.
> >
> > Lucas
> >
>


[jira] [Resolved] (KAFKA-6751) Make max.connections.per.ip.overrides a dynamic config

2018-08-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6751.

Resolution: Fixed

> Make max.connections.per.ip.overrides a dynamic config
> --
>
> Key: KAFKA-6751
> URL: https://issues.apache.org/jira/browse/KAFKA-6751
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> It might be useful to be able to update this config dynamically since we 
> occasionally run into situations where a particular host (or set of hosts) is 
> causing some trouble for the broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-08-09 Thread Matthias J. Sax
Thanks for your input Guozhang and John.

I see your point, that the upgrade API is not simple. If you don't
thinks it's valuable to make generic store upgrades possible (atm), we
can make the API internal, too. The impact is, that we only support a
predefined set up upgrades (ie, KV to KVwithTs, Windowed to
WindowedWithTS etc) for which we implement the internal interfaces.

We can keep the design generic, so if we decide to make it public, we
don't need to re-invent it. This will also have the advantage, that we
can add upgrade pattern for other stores later, too.

I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it
was the only way I could find to design a generic upgrade interface. If
we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would
become an internal interface I guess (don't think we can remove it).

I will wait for more feedback about this and if nobody wants to keep it
as public API I will update the KIP accordingly. Will add some more
clarifications for different upgrade patterns in the mean time and fix
the typos/minor issues.

About adding a new state UPGRADING: maybe we could do that. However, I
find it particularly difficult to make the estimation when we should
switch to RUNNING, thus, I am a little hesitant. Using store callbacks
or just logging the progress including some indication about the "lag"
might actually be sufficient. Not sure what others think?

About "value before timestamp": no real reason and I think it does not
make any difference. Do you want to change it?

About upgrade robustness: yes, we cannot control if an instance fails.
That is what I meant by "we need to write test". The upgrade should be
able to continuous even is an instance goes down (and we must make sure
that we don't end up in an invalid state that forces us to wipe out the
whole store). Thus, we need to write system tests that fail instances
during upgrade.

For `in_place_offline` upgrade: I don't think we need this mode, because
people can do this via a single rolling bounce.

 - prepare code and switch KV-Store to KVwithTs-Store
 - do a single rolling bounce (don't set any upgrade config)

For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
remove the `StoreUpgradeBuilder`) will detect that there is only an old
local KV store w/o TS, will start to restore the new KVwithTs store,
wipe out the old store and replace with the new store after restore is
finished, and start processing only afterwards. (I guess we need to
document this case -- will also add it to the KIP.)



-Matthias



On 8/9/18 1:10 PM, John Roesler wrote:
> Hi Matthias,
> 
> I think this KIP is looking really good.
> 
> I have a few thoughts to add to the others:
> 
> 1. You mentioned at one point users needing to configure
> `upgrade.mode="null"`. I think this was a typo and you meant to say they
> should remove the config. If they really have to set it to a string "null"
> or even set it to a null value but not remove it, it would be unfortunate.
> 
> 2. In response to Bill's comment #1 , you said that "The idea is that the
> upgrade should be robust and not fail. We need to write according tests".
> I may have misunderstood the conversation, but I don't think it's within
> our power to say that an instance won't fail. What if one of my computers
> catches on fire? What if I'm deployed in the cloud and one instance
> disappears and is replaced by a new one? Or what if one instance goes AWOL
> for a long time and then suddenly returns? How will the upgrade process
> behave in light of such failures?
> 
> 3. your thought about making in-place an offline mode is interesting, but
> it might be a bummer for on-prem users who wish to upgrade online, but
> cannot just add new machines to the pool. It could be a new upgrade mode
> "offline-in-place", though...
> 
> 4. I was surprised to see that a user would need to modify the topology to
> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
> suggestions would remove this necessity.
> 
> Thanks for taking on this very complex but necessary work.
> 
> -John
> 
> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang  wrote:
> 
>> Hello Matthias,
>>
>> Thanks for the updated KIP. Some more comments:
>>
>> 1. The current set of proposed API is a bit too complicated, which makes
>> the upgrade flow from user's perspective also a bit complex. I'd like to
>> check different APIs and discuss about their needs separately:
>>
>> 1.a. StoreProxy: needed for in-place upgrade only, between the first
>> and second rolling bounce, where the old-versioned stores can handle
>> new-versioned store APIs. I think such upgrade paths (i.e. from one store
>> type to another) would not be very common: users may want to upgrade from a
>> certain store engine to another, but the interface would likely be staying
>> the same. Hence personally I'd suggest we keep it internally and only
>> consider exposing it in the future if it does become a common pattern.

[jira] [Created] (KAFKA-7273) Converters should have access to headers.

2018-08-09 Thread Jeremy Custenborder (JIRA)
Jeremy Custenborder created KAFKA-7273:
--

 Summary: Converters should have access to headers.
 Key: KAFKA-7273
 URL: https://issues.apache.org/jira/browse/KAFKA-7273
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Jeremy Custenborder


I found myself wanting to build a converter that stored additional type 
information within headers. The converter interface does not allow a developer 
to access to the headers in a Converter. I'm not suggesting that we change the 
method for serializing them, rather that 
*org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
and *toConnectData*. For example something like this.
{code:java}
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.storage.Converter;

public interface ExtendedConverter extends Converter {
  byte[] fromConnectData(String topic, Headers headers, Schema schema, Object 
object);
  SchemaAndValue toConnectData(String topic, Headers headers, byte[] payload);
}

{code}
This would be a similar approach to what was already done with 
ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-2.0-jdk8 #113

2018-08-09 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7250: switch scala transform to TransformSupplier (#5481)

--
[...truncated 885.51 KB...]

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler PASSED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString STARTED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData STARTED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline STARTED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry STARTED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress PASSED

kafka.network.SocketServerTest > 

[jira] [Created] (KAFKA-7272) Fix ignored test in streams_upgrade_test.py: test_simple_upgrade_downgrade

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7272:
---

 Summary: Fix ignored test in streams_upgrade_test.py: 
test_simple_upgrade_downgrade
 Key: KAFKA-7272
 URL: https://issues.apache.org/jira/browse/KAFKA-7272
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Fix starting from the oldest version that ignores the test



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7271:
---

 Summary: Fix ignored test in streams_upgrade_test.py: 
test_upgrade_downgrade_brokers
 Key: KAFKA-7271
 URL: https://issues.apache.org/jira/browse/KAFKA-7271
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7270) Add latest releases to streams_upgrade_test.py

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7270:
---

 Summary: Add latest releases to streams_upgrade_test.py
 Key: KAFKA-7270
 URL: https://issues.apache.org/jira/browse/KAFKA-7270
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


namely, 2.0, 1.1.1, 1.0.2, 0.11.3, 1.10.2.2

Note that the relevant older branches need to add the relevant bugfix versions 
for themselves and their ancestor versions.

The 2.0 branch actually needs to add the 2.0 version!

Trunk needs to add all the versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2882

2018-08-09 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7250: switch scala transform to TransformSupplier (#5481)

--
[...truncated 874.84 KB...]
kafka.security.auth.ResourceTest > 
shouldThrowOnTwoPartStringWithUnknownResourceType PASSED

kafka.security.auth.ResourceTest > shouldThrowOnBadResourceTypeSeparator STARTED

kafka.security.auth.ResourceTest > shouldThrowOnBadResourceTypeSeparator PASSED

kafka.security.auth.ResourceTest > shouldParseThreePartString STARTED

kafka.security.auth.ResourceTest > shouldParseThreePartString PASSED

kafka.security.auth.ResourceTest > shouldRoundTripViaString STARTED

kafka.security.auth.ResourceTest > shouldRoundTripViaString PASSED

kafka.security.auth.ResourceTest > shouldParseThreePartWithEmbeddedSeparators 
STARTED

kafka.security.auth.ResourceTest > shouldParseThreePartWithEmbeddedSeparators 
PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAuthorizeWithPrefixedResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAuthorizeWithPrefixedResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDeleteAllAclOnWildcardResource STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDeleteAllAclOnWildcardResource PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnWildcardResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnWildcardResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2 STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2 PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2 STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2 PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnPrefixedResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnPrefixedResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testEmptyAclThrowsException 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testEmptyAclThrowsException PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 

[jira] [Created] (KAFKA-7269) KStream.merge is not documented

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7269:
---

 Summary: KStream.merge is not documented
 Key: KAFKA-7269
 URL: https://issues.apache.org/jira/browse/KAFKA-7269
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


If I understand the operator correctly, it should be documented as a stateless 
transformation at 
https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp Synchronization Behavior

2018-08-09 Thread John Roesler
+1 non-binding

On Thu, Aug 9, 2018 at 3:14 PM Matthias J. Sax 
wrote:

> +1 (binding)
>
> On 8/9/18 11:57 AM, Guozhang Wang wrote:
> > Hello all,
> >
> > I would like to start the voting processing on the following KIP, to
> allow
> > users control when a task can be processed based on its buffered records,
> > and how the stream time of a task be advanced.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
> >
> >
> >
> > Thanks,
> > -- Guozhang
> >
>
>


Re: [VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp Synchronization Behavior

2018-08-09 Thread Matthias J. Sax
+1 (binding)

On 8/9/18 11:57 AM, Guozhang Wang wrote:
> Hello all,
> 
> I would like to start the voting processing on the following KIP, to allow
> users control when a task can be processed based on its buffered records,
> and how the stream time of a task be advanced.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
> 
> 
> 
> Thanks,
> -- Guozhang
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-08-09 Thread John Roesler
Hi Matthias,

I think this KIP is looking really good.

I have a few thoughts to add to the others:

1. You mentioned at one point users needing to configure
`upgrade.mode="null"`. I think this was a typo and you meant to say they
should remove the config. If they really have to set it to a string "null"
or even set it to a null value but not remove it, it would be unfortunate.

2. In response to Bill's comment #1 , you said that "The idea is that the
upgrade should be robust and not fail. We need to write according tests".
I may have misunderstood the conversation, but I don't think it's within
our power to say that an instance won't fail. What if one of my computers
catches on fire? What if I'm deployed in the cloud and one instance
disappears and is replaced by a new one? Or what if one instance goes AWOL
for a long time and then suddenly returns? How will the upgrade process
behave in light of such failures?

3. your thought about making in-place an offline mode is interesting, but
it might be a bummer for on-prem users who wish to upgrade online, but
cannot just add new machines to the pool. It could be a new upgrade mode
"offline-in-place", though...

4. I was surprised to see that a user would need to modify the topology to
do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
suggestions would remove this necessity.

Thanks for taking on this very complex but necessary work.

-John

On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang  wrote:

> Hello Matthias,
>
> Thanks for the updated KIP. Some more comments:
>
> 1. The current set of proposed API is a bit too complicated, which makes
> the upgrade flow from user's perspective also a bit complex. I'd like to
> check different APIs and discuss about their needs separately:
>
> 1.a. StoreProxy: needed for in-place upgrade only, between the first
> and second rolling bounce, where the old-versioned stores can handle
> new-versioned store APIs. I think such upgrade paths (i.e. from one store
> type to another) would not be very common: users may want to upgrade from a
> certain store engine to another, but the interface would likely be staying
> the same. Hence personally I'd suggest we keep it internally and only
> consider exposing it in the future if it does become a common pattern.
>
> 1.b. ConverterStore / RecordConverter: needed for both in-place and
> roll-over upgrade, between the first and second rolling bounces, for the
> new versioned store to be able to read old-versioned changelog topics.
> Firstly I think we should not expose key in the public APIs but only the
> values, since allowing key format changes would break log compaction, and
> hence would not be compatible anyways. As for value format changes,
> personally I think we can also keep its upgrade logic internally as it may
> not worth generalizing to user customizable logic.
>
> 1.c. If you agrees with 2.a/b above, then we can also remove "
> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public APIs.
>
> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is not
> needed either given that we are exposing "ValueAndTimestamp" anyways. I.e.
> it is just a syntax sugar and for IQ, users can always just set a "
> QueryableStoreType>>" as the new
> interface does not provide any additional functions.
>
>
> 2. Could we further categorize the upgrade flow for different use cases,
> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> automatically for non-windowed aggregate; 2) PAPI users who do not need to
> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to
> KeyValueWithTimestampStore. Just to give my understanding for 3), the
> upgrade flow for users may be simplified as the following (for both
> in-place and roll-over):
>
> * Update the jar to new version, make code changes from KeyValueStore
> to KeyValueWithTimestampStore, set upgrade config.
>
> * First rolling bounce, and library code can internally use proxy /
> converter based on the specified config to handle new APIs with old stores,
> while let new stores read from old changelog data.
>
> * Reset upgrade config.
>
> * Second rolling bounce, and the library code automatically turn off
> logic for proxy / converter.
>
>
> 3. Some more detailed proposals are needed for when to recommend users to
> trigger the second rolling bounce. I have one idea to share here: we add a
> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade
> config is set, and 2) the new stores are still ramping up (for the second
> part, we can start with some internal hard-coded heuristics to decide when
> it is close to be ramped up). If either one of it is not true any more, it
> should transit to RUNNING. Users can then watch on this state, and decide
> to only trigger the second rebalance when the state has transited from
> UPGRADING. They can also choose to cut over while the instance is still
> UPGRADING, the downside is that after that 

Re: Kafka stream - Internal topic name and schema avro compatibility

2018-08-09 Thread John Roesler
Hi all,

It definitely would be nice to reuse whatever state is still valid after a
topology update, and KIP-307 is indeed likely what we have to do to solve
that problem. The discuss thread for the KIP hasn't gotten a lot of traffic
recently, so it might be nice if you reply to it with your thoughts to keep
things moving.

I agree with Adam that you can try some acrobatics to keep the state
around, but it's likely to get very messy very fast. Using "auto offset
reset = earliest" would let you rebuild from scratch, but only if your
input topics are all set to have long retention periods (otherwise, the
data you previously processed will be gone). Probably, you'd use topic
compaction as well.

It does seem suitable to skip Schema Registry for internal topics. I view
the registry as being most important for your interfaces with other systems
(topics others have produced for you or you have produced for others). For
an internal topic, you are both producer and consumer, so you can embed the
schema in your program and just publish the serialized data to the internal
topic.

Alternatively, I think that the app reset tool should delete the changelog
topics anyway, so writing a script to run the app reset tool, and then
delete all the app's registry data should be perfectly fine.

-John

On Thu, Aug 9, 2018 at 9:59 AM Adam Bellemare 
wrote:

> Hi Cédric
>
> 1. You should be able to rebuild your internal state after doing a full
> reset (Deleting all internal topics). The application will just need to
> consume data from the beginning of the input streams and rebuild the state
> accordingly. If you don't want to lose the state, or if it is too expensive
> to rebuild the entire application, you may wish to look into external
> storage options instead. In my experience, this begins to get more
> complicated though, so I prefer to just rebuild the state.
>
> 2. I use Scala for my Kafka Streams work, in particular this very helpful
> library which converts case classes into Avro format and back:
> https://github.com/sksamuel/avro4s
>
> In my solution, I only send the encoded Avro data but not the schema. It is
> up to the developer of the application to know which data is being stored
> in which internal topic. This proves to be very easy since I'm only ever
> handling Plain-Ol-Java/Scala-Objects in the Kafka Streams DSL.
> I should add that you are not required to use any schema for internal
> topics, but I much prefer to anyways because I find it the simplest and
> least error prone out of all my options.
> In terms of performance, there is no difference from that of using the
> schema registry ones since the process is the same.
>
> Hope this helps a bit.
>
> Adam
>
>
>
>
>
>
> On Thu, Aug 9, 2018 at 4:33 AM, Cedric BERTRAND <
> bertrandcedric@gmail.com> wrote:
>
> > Thanks John and Adam for your answer,
> >
> > After investigation, I am exactly in the case you describe John.
> > After a modification in my toplogy, a processor KEY-SELECT get the same
> > number of an old processor KEY-SELECT with the associated repartition
> > topic.
> > We use the app reset tool to clean all internal topic but the tool
> doesn't
> > clean the schema registry.
> >
> > In see, 2 solutions to solve this problem when it occured.
> >
> > 1. Clean all internal topic and subjects in schema registry
> > The problem with this solution is that I also clean internal-changelog
> > topic. Sometime I don't want to loose this internal state.
> >
> > 2. I don't use schema registry for internal topic (the solution exposed
> by
> > Adam)
> > Without schema registry, do I send all the object (data + schema avro)
> into
> > Kafka ?
> > What about performance with the solution ?
> >
> >
> > The solution to give an explicit name to all operator seam to be
> > interesting to solve this problem.
> >
> > I found this KIP which propose to implement this solution.
> >
> > KIP-307: Allow to define custom processor names with KStreams DSL
> >  > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL>
> >
> > I know that the probalilty a KEY-SELECT node get the same number than an
> > old one is very low.
> > But when it occured, it's extremely hard to understand.
> >
> > Thanks for your time,
> >
> > Cédric
> >
> >
> >
> >
> >
> > Le mer. 8 août 2018 à 22:34, John Roesler  a écrit :
> >
> > > Hi Cédric,
> > >
> > > The suffix is generated when we build the topology in such a way to
> > > guarantee each node/interna-topic/state-store gets a unique name.
> > >
> > > Generally speaking, it is unsafe to modify the topology and restart it.
> > We
> > > recommend using the app reset tool whenever you update your topology.
> > >
> > > That said, some changes to the topology might be safe, so your mileage
> > may
> > > vary; just be aware that changing the topology in place will
> potentially
> > > produce corrupt data.
> > >
> > > The main example I'd give is if you were to restructure your 

[jira] [Resolved] (KAFKA-6211) PartitionAssignmentState is protected whereas KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState Object

2018-08-09 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6211.
--
Resolution: Won't Fix

Closing this in favour of AdminClient API.  

> PartitionAssignmentState is protected whereas 
> KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState 
> Object
> ---
>
> Key: KAFKA-6211
> URL: https://issues.apache.org/jira/browse/KAFKA-6211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1, 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Subhransu Acharya
>Priority: Critical
>  Labels: patch
> Attachments: a.png
>
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
>  KafkaConsumerGroupService.describeGroup() is returning 
> Tuple2, Option>> but 
> ConsumerGroupCommand has PartitionAssignmentState as a protected class inside 
> it.
> There is no way to create an instance of PartitionAssignmentState.
> make it default in order to use the describe command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6095) Error when displaying LAG information consumer partitions that use zookeeper

2018-08-09 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6095.
--
Resolution: Auto Closed

Closing as scala consumer related code/tools are removed from 2.0 release. 
looks like above issue is related to third party client behaviour against 
0.11.0.1 broker version. Pls raise the issue with the client project.

> Error when displaying LAG information consumer partitions that use zookeeper
> 
>
> Key: KAFKA-6095
> URL: https://issues.apache.org/jira/browse/KAFKA-6095
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.1
> Environment: java version "1.8.0_144"
>Reporter: Jorge
>Priority: Minor
>
> /opt/kafka/bin/kafka-consumer-groups.sh --zookeeper localhost --group logs 
> --describe
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> [2017-10-20 10:09:01,182] ERROR error parsing consumer json string { "logs": 
> 1 } (kafka.consumer.TopicCount$)
> kafka.common.KafkaException: error constructing TopicCount : { "logs": 1 }
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:72)
> at 
> kafka.utils.ZkUtils$$anonfun$getTopicsPerMemberId$1.apply(ZkUtils.scala:862)
> at 
> kafka.utils.ZkUtils$$anonfun$getTopicsPerMemberId$1.apply(ZkUtils.scala:861)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at kafka.utils.ZkUtils.getTopicsPerMemberId(ZkUtils.scala:861)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.collectGroupAssignment(ConsumerGroupCommand.scala:281)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:171)
> at 
> kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:235)
> at 
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:74)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> Error: Executing consumer group command failed due to error constructing 
> TopicCount : { "logs": 1 }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6432) Lookup indices may cause unnecessary page fault

2018-08-09 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6432.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Lookup indices may cause unnecessary page fault
> ---
>
> Key: KAFKA-6432
> URL: https://issues.apache.org/jira/browse/KAFKA-6432
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: Binary Search - Diagram 1.png, Binary Search - Diagram 
> 2.png
>
>
> For each topic-partition, Kafka broker maintains two indices: one for message 
> offset, one for message timestamp. By default, a new index entry is appended 
> to each index for every 4KB messages. The lookup of the indices is a simple 
> binary search. The indices are mmaped files, and cached by Linux page cache.
> Both consumer fetch and follower fetch have to do an offset lookup, before 
> accessing the actual message data. The simple binary search algorithm used 
> for looking up the index is not cache friendly, and may cause page faults 
> even on high QPS topic-partitions.
> For example (diagram 1), when looking up an index entry in page 12, the 
> binary search algorithm has to read page 0, 6, 9 and 11. After new messages 
> are appended to the topic-partition, the index grows to 13 pages. Now, if the 
> follower fetch request looking up the 1st index entry of page 13, the binary 
> search algorithm will go to page 0, 7, 10 and 12. Among those pages, page 7 
> and 10 have not been used for a long time, and may already be swapped to hard 
> disk.
> Actually, in a normal Kafka broker, all the follower fetch requests and most 
> consumer fetch requests should only look up the last few entries of the 
> index. We can make the index lookup more cache friendly, by searching in the 
> last one or two pages of the index first. (Diagram 2)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[VOTE] KIP-353: Allow Users to Configure Multi-Streams Timestamp Synchronization Behavior

2018-08-09 Thread Guozhang Wang
Hello all,

I would like to start the voting processing on the following KIP, to allow
users control when a task can be processed based on its buffered records,
and how the stream time of a task be advanced.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
353%3A+Improve+Kafka+Streams+Timestamp+Synchronization



Thanks,
-- Guozhang


Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-09 Thread Guozhang Wang
Thanks Matthias, will update the KIP accordingly.

On Thu, Aug 9, 2018 at 11:26 AM, Matthias J. Sax 
wrote:

> @Guozhang, I think you can start the VOTE for this KIP? I don't have any
> further comments.
>
> One more nit: we should explicitly state, that the new config is
> wall-clock time based.
>
>
> -Matthias
>
>
> On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> > Correct. It's not about reordering. Records will still be processed in
> > offset-order per partition.
> >
> > For multi-partition task (like joins), we use the timestamp of the
> > "head" record of each partition to determine which record to process
> > first (to process records across partitions in timestamp order if
> > possible) -- however, if one partition does not have a record, we cannot
> > make the decision which record to pick next. Thus, the task blocks and
> > we don't want to block forever. If we unblock on missing data, we might
> > get out-of-order processing with regard to timestamps between two
> > partitions.
> >
> >
> > -Matthias
> >
> > On 8/7/18 12:03 PM, Thomas Becker wrote:
> >> In typing up a scenario to illustrate my question, I think I found the
> answer ;) We are not assuming timestamps will be strictly increasing within
> a topic and trying to make processing order deterministic even in the face
> of that. Thanks for making me think about it (or please correct me if I'm
> wrong).
> >>
> >>
> >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
> >>
> >> @Thomas, just to rephrase (from my understanding):
> >>
> >>
> >> So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >>
> >> The idea of the KIP is to configure a max blocking time for tasks. This
> >>
> >> allows to provide a 'grace period such that new data for the empty
> >>
> >> partition can we received (if an upstream producer still writes data,
> >>
> >> but with low throughput). After the blocking timeout expires (in case
> >>
> >> throughput is too small or upstream producer died), the task will
> >>
> >> process data even if there are empty input topic partitions (because we
> >>
> >> cannot block forever in a stream processing scenario).
> >>
> >>
> >> Not sure, what you mean by "using the timestamp of the last message"?
> >>
> >> Using for what? The KIP is about processing records of partitions that
> >>
> >> do have data if some other partitions are empty (and to allow users to
> >>
> >> configure a max "blocking time" until processing is "forced").
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 8/7/18 10:02 AM, Guozhang Wang wrote:
> >>
> >> @Tommy
> >>
> >>
> >> Yes that's the intent. Again note that the current behavior is indeed
> "just
> >>
> >> using the timestamp of the last message I saw", and continue processing
> >>
> >> what's in the buffer from other streams, but this may introduce
> >>
> >> out-of-ordering.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker  mailto:thomas.bec...@tivo.com>>
> >>
> >> wrote:
> >>
> >>
> >> Thanks Guozhang. So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >> Initially I was under the impression that this would only happen when
> the
> >>
> >> task had not yet seen any messages from one of the partitions.
> >>
> >>
> >> Regarding choosing, you are exactly right. This mechanism is pluggable
> in
> >>
> >> Samza, and I'd like to see something similar done in Kafka Streams. The
> >>
> >> timestamp based choosing policy is great and makes sense in a lot of
> >>
> >> scenarios, but having something like a priority based policy would be
> very
> >>
> >> nice for some of our usecases.
> >>
> >>
> >> -Tommy
> >>
> >>
> >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
> >>
> >>
> >> @Ted
> >>
> >>
> >>
> >> Yes, I will update the KIP mentioning this as a separate consideration.
> >>
> >>
> >>
> >>
> >> @Thomas
> >>
> >>
> >>
> >> The idle period may be happening during the processing as well. Think:
> if
> >>
> >>
> >> you are joining two streams with very different throughput traffic, say
> for
> >>
> >>
> >> an extreme case, one stream comes in as 100K messages / sec, another
> comes
> >>
> >>
> >> in as 1 message / sec. Then it could happen from time to time that we
> have
> >>
> >>
> >> reached the tail of the low-traffic stream and do not have any data
> >>
> >>
> >> received yet from that stream, while the other stream still have
> >>
> >>
> >> unprocessed buffered data. Currently we will always go ahead and just
> >>
> >>
> >> 

Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-09 Thread John Roesler
I also have no comments. The KIP looks good to me.
-John

On Thu, Aug 9, 2018 at 1:26 PM Matthias J. Sax 
wrote:

> @Guozhang, I think you can start the VOTE for this KIP? I don't have any
> further comments.
>
> One more nit: we should explicitly state, that the new config is
> wall-clock time based.
>
>
> -Matthias
>
>
> On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> > Correct. It's not about reordering. Records will still be processed in
> > offset-order per partition.
> >
> > For multi-partition task (like joins), we use the timestamp of the
> > "head" record of each partition to determine which record to process
> > first (to process records across partitions in timestamp order if
> > possible) -- however, if one partition does not have a record, we cannot
> > make the decision which record to pick next. Thus, the task blocks and
> > we don't want to block forever. If we unblock on missing data, we might
> > get out-of-order processing with regard to timestamps between two
> > partitions.
> >
> >
> > -Matthias
> >
> > On 8/7/18 12:03 PM, Thomas Becker wrote:
> >> In typing up a scenario to illustrate my question, I think I found the
> answer ;) We are not assuming timestamps will be strictly increasing within
> a topic and trying to make processing order deterministic even in the face
> of that. Thanks for making me think about it (or please correct me if I'm
> wrong).
> >>
> >>
> >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
> >>
> >> @Thomas, just to rephrase (from my understanding):
> >>
> >>
> >> So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >>
> >> The idea of the KIP is to configure a max blocking time for tasks. This
> >>
> >> allows to provide a 'grace period such that new data for the empty
> >>
> >> partition can we received (if an upstream producer still writes data,
> >>
> >> but with low throughput). After the blocking timeout expires (in case
> >>
> >> throughput is too small or upstream producer died), the task will
> >>
> >> process data even if there are empty input topic partitions (because we
> >>
> >> cannot block forever in a stream processing scenario).
> >>
> >>
> >> Not sure, what you mean by "using the timestamp of the last message"?
> >>
> >> Using for what? The KIP is about processing records of partitions that
> >>
> >> do have data if some other partitions are empty (and to allow users to
> >>
> >> configure a max "blocking time" until processing is "forced").
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 8/7/18 10:02 AM, Guozhang Wang wrote:
> >>
> >> @Tommy
> >>
> >>
> >> Yes that's the intent. Again note that the current behavior is indeed
> "just
> >>
> >> using the timestamp of the last message I saw", and continue processing
> >>
> >> what's in the buffer from other streams, but this may introduce
> >>
> >> out-of-ordering.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker  >
> >>
> >> wrote:
> >>
> >>
> >> Thanks Guozhang. So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >> Initially I was under the impression that this would only happen when
> the
> >>
> >> task had not yet seen any messages from one of the partitions.
> >>
> >>
> >> Regarding choosing, you are exactly right. This mechanism is pluggable
> in
> >>
> >> Samza, and I'd like to see something similar done in Kafka Streams. The
> >>
> >> timestamp based choosing policy is great and makes sense in a lot of
> >>
> >> scenarios, but having something like a priority based policy would be
> very
> >>
> >> nice for some of our usecases.
> >>
> >>
> >> -Tommy
> >>
> >>
> >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
> >>
> >>
> >> @Ted
> >>
> >>
> >>
> >> Yes, I will update the KIP mentioning this as a separate consideration.
> >>
> >>
> >>
> >>
> >> @Thomas
> >>
> >>
> >>
> >> The idle period may be happening during the processing as well. Think:
> if
> >>
> >>
> >> you are joining two streams with very different throughput traffic, say
> for
> >>
> >>
> >> an extreme case, one stream comes in as 100K messages / sec, another
> comes
> >>
> >>
> >> in as 1 message / sec. Then it could happen from time to time that we
> have
> >>
> >>
> >> reached the tail of the low-traffic stream and do not have any data
> >>
> >>
> >> received yet from that stream, while the other stream still have
> >>
> >>
> >> unprocessed buffered data. Currently we will always go ahead and just
> >>
> >>
> >> 

Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-09 Thread Bill Bejeck
@Guozhang, I've read the KIP and I don't have any further comments in
addition to what's already been discussed.

Thanks,
Bill

On Thu, Aug 9, 2018 at 2:26 PM Matthias J. Sax 
wrote:

> @Guozhang, I think you can start the VOTE for this KIP? I don't have any
> further comments.
>
> One more nit: we should explicitly state, that the new config is
> wall-clock time based.
>
>
> -Matthias
>
>
> On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> > Correct. It's not about reordering. Records will still be processed in
> > offset-order per partition.
> >
> > For multi-partition task (like joins), we use the timestamp of the
> > "head" record of each partition to determine which record to process
> > first (to process records across partitions in timestamp order if
> > possible) -- however, if one partition does not have a record, we cannot
> > make the decision which record to pick next. Thus, the task blocks and
> > we don't want to block forever. If we unblock on missing data, we might
> > get out-of-order processing with regard to timestamps between two
> > partitions.
> >
> >
> > -Matthias
> >
> > On 8/7/18 12:03 PM, Thomas Becker wrote:
> >> In typing up a scenario to illustrate my question, I think I found the
> answer ;) We are not assuming timestamps will be strictly increasing within
> a topic and trying to make processing order deterministic even in the face
> of that. Thanks for making me think about it (or please correct me if I'm
> wrong).
> >>
> >>
> >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
> >>
> >> @Thomas, just to rephrase (from my understanding):
> >>
> >>
> >> So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >>
> >> The idea of the KIP is to configure a max blocking time for tasks. This
> >>
> >> allows to provide a 'grace period such that new data for the empty
> >>
> >> partition can we received (if an upstream producer still writes data,
> >>
> >> but with low throughput). After the blocking timeout expires (in case
> >>
> >> throughput is too small or upstream producer died), the task will
> >>
> >> process data even if there are empty input topic partitions (because we
> >>
> >> cannot block forever in a stream processing scenario).
> >>
> >>
> >> Not sure, what you mean by "using the timestamp of the last message"?
> >>
> >> Using for what? The KIP is about processing records of partitions that
> >>
> >> do have data if some other partitions are empty (and to allow users to
> >>
> >> configure a max "blocking time" until processing is "forced").
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 8/7/18 10:02 AM, Guozhang Wang wrote:
> >>
> >> @Tommy
> >>
> >>
> >> Yes that's the intent. Again note that the current behavior is indeed
> "just
> >>
> >> using the timestamp of the last message I saw", and continue processing
> >>
> >> what's in the buffer from other streams, but this may introduce
> >>
> >> out-of-ordering.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker  >
> >>
> >> wrote:
> >>
> >>
> >> Thanks Guozhang. So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >> Initially I was under the impression that this would only happen when
> the
> >>
> >> task had not yet seen any messages from one of the partitions.
> >>
> >>
> >> Regarding choosing, you are exactly right. This mechanism is pluggable
> in
> >>
> >> Samza, and I'd like to see something similar done in Kafka Streams. The
> >>
> >> timestamp based choosing policy is great and makes sense in a lot of
> >>
> >> scenarios, but having something like a priority based policy would be
> very
> >>
> >> nice for some of our usecases.
> >>
> >>
> >> -Tommy
> >>
> >>
> >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
> >>
> >>
> >> @Ted
> >>
> >>
> >>
> >> Yes, I will update the KIP mentioning this as a separate consideration.
> >>
> >>
> >>
> >>
> >> @Thomas
> >>
> >>
> >>
> >> The idle period may be happening during the processing as well. Think:
> if
> >>
> >>
> >> you are joining two streams with very different throughput traffic, say
> for
> >>
> >>
> >> an extreme case, one stream comes in as 100K messages / sec, another
> comes
> >>
> >>
> >> in as 1 message / sec. Then it could happen from time to time that we
> have
> >>
> >>
> >> reached the tail of the low-traffic stream and do not have any data
> >>
> >>
> >> received yet from that stream, while the other stream still have
> >>
> >>
> >> unprocessed 

Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-09 Thread Matthias J. Sax
@Guozhang, I think you can start the VOTE for this KIP? I don't have any
further comments.

One more nit: we should explicitly state, that the new config is
wall-clock time based.


-Matthias


On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> Correct. It's not about reordering. Records will still be processed in
> offset-order per partition.
> 
> For multi-partition task (like joins), we use the timestamp of the
> "head" record of each partition to determine which record to process
> first (to process records across partitions in timestamp order if
> possible) -- however, if one partition does not have a record, we cannot
> make the decision which record to pick next. Thus, the task blocks and
> we don't want to block forever. If we unblock on missing data, we might
> get out-of-order processing with regard to timestamps between two
> partitions.
> 
> 
> -Matthias
> 
> On 8/7/18 12:03 PM, Thomas Becker wrote:
>> In typing up a scenario to illustrate my question, I think I found the 
>> answer ;) We are not assuming timestamps will be strictly increasing within 
>> a topic and trying to make processing order deterministic even in the face 
>> of that. Thanks for making me think about it (or please correct me if I'm 
>> wrong).
>>
>>
>> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
>>
>> @Thomas, just to rephrase (from my understanding):
>>
>>
>> So in the scenario you describe, where one topic has
>>
>> vastly lower throughput, you're saying that when the lower throughput topic
>>
>> is fully caught up (no messages in the buffer), the task will idle rather
>>
>> than using the timestamp of the last message it saw from that topic?
>>
>>
>> The idea of the KIP is to configure a max blocking time for tasks. This
>>
>> allows to provide a 'grace period such that new data for the empty
>>
>> partition can we received (if an upstream producer still writes data,
>>
>> but with low throughput). After the blocking timeout expires (in case
>>
>> throughput is too small or upstream producer died), the task will
>>
>> process data even if there are empty input topic partitions (because we
>>
>> cannot block forever in a stream processing scenario).
>>
>>
>> Not sure, what you mean by "using the timestamp of the last message"?
>>
>> Using for what? The KIP is about processing records of partitions that
>>
>> do have data if some other partitions are empty (and to allow users to
>>
>> configure a max "blocking time" until processing is "forced").
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 8/7/18 10:02 AM, Guozhang Wang wrote:
>>
>> @Tommy
>>
>>
>> Yes that's the intent. Again note that the current behavior is indeed "just
>>
>> using the timestamp of the last message I saw", and continue processing
>>
>> what's in the buffer from other streams, but this may introduce
>>
>> out-of-ordering.
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker 
>> mailto:thomas.bec...@tivo.com>>
>>
>> wrote:
>>
>>
>> Thanks Guozhang. So in the scenario you describe, where one topic has
>>
>> vastly lower throughput, you're saying that when the lower throughput topic
>>
>> is fully caught up (no messages in the buffer), the task will idle rather
>>
>> than using the timestamp of the last message it saw from that topic?
>>
>> Initially I was under the impression that this would only happen when the
>>
>> task had not yet seen any messages from one of the partitions.
>>
>>
>> Regarding choosing, you are exactly right. This mechanism is pluggable in
>>
>> Samza, and I'd like to see something similar done in Kafka Streams. The
>>
>> timestamp based choosing policy is great and makes sense in a lot of
>>
>> scenarios, but having something like a priority based policy would be very
>>
>> nice for some of our usecases.
>>
>>
>> -Tommy
>>
>>
>> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
>>
>>
>> @Ted
>>
>>
>>
>> Yes, I will update the KIP mentioning this as a separate consideration.
>>
>>
>>
>>
>> @Thomas
>>
>>
>>
>> The idle period may be happening during the processing as well. Think: if
>>
>>
>> you are joining two streams with very different throughput traffic, say for
>>
>>
>> an extreme case, one stream comes in as 100K messages / sec, another comes
>>
>>
>> in as 1 message / sec. Then it could happen from time to time that we have
>>
>>
>> reached the tail of the low-traffic stream and do not have any data
>>
>>
>> received yet from that stream, while the other stream still have
>>
>>
>> unprocessed buffered data. Currently we will always go ahead and just
>>
>>
>> process the other stream's buffered data, but bare in mind that when we
>>
>>
>> eventually have received the data from the low-traffic stream we realized
>>
>>
>> that its timestamp is even smaller than what we already have processed, and
>>
>>
>> hence accidentally introduced out-of-ordering data.
>>
>>
>>
>> What you described in stream-table joins is also a common case (e.g.
>>
>>
>> https://issues.apache.org/jira/browse/KAFKA-4113). 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-09 Thread Matthias J. Sax
Hi,

thanks for the detailed discussion. I learned a lot about internals again :)

I like the idea or a user config `member.name` and to keep `member.id`
internal. Also agree with Guozhang, that reusing `client.id` might not
be a good idea.

To clarify the algorithm, each time we generate a new `member.id`, we
also need to update the "group membership" information (ie, mapping
[member.id, Assignment]), right? Ie, the new `member.id` replaces the
old entry in the cache.

I also think, we need to preserve the `member.name -> member.id` mapping
in the `__consumer_offset` topic. The KIP should mention this IMHO.

For changing the default value of config `leave.group.on.close`. I agree
with John, that we should not change the default config, because it
would impact all consumer groups with dynamic assignment. However, I
think we can document, that if static assignment is used (ie,
`member.name` is configured) we never send a LeaveGroupRequest
regardless of the config. Note, that the config is internal, so not sure
how to document this in detail. We should not expose the internal config
in the docs.

About upgrading: why do we need have two rolling bounces and encode
"static" vs "dynamic" in the JoinGroupRequest?

If we upgrade an existing consumer group from dynamic to static, I don't
see any reason why both should not work together and single rolling
bounce would not be sufficient? If we bounce the first consumer and
switch from dynamic to static, it sends a `member.name` and the broker
registers the [member.name, member.id] in the cache. Why would this
interfere with all other consumer that use dynamic assignment?

Also, Guozhang mentioned that for all other request, we need to check if
the mapping [member.name, member.id] contains the send `member.id` -- I
don't think this is necessary -- it seems to be sufficient to check the
`member.id` from the [member.id, Assignment] mapping as be do today --
thus, checking `member.id` does not require any change IMHO.


-Matthias


On 8/7/18 7:13 PM, Guozhang Wang wrote:
> @James
> 
> What you described is true: the transition from dynamic to static
> memberships are not thought through yet. But I do not think it is an
> impossible problem: note that we indeed moved the offset commit from ZK to
> kafka coordinator in 0.8.2 :) The migration plan is to first to
> double-commits on both zk and coordinator, and then do a second round to
> turn the zk off.
> 
> So just to throw a wild idea here: also following a two-rolling-bounce
> manner, in the JoinGroupRequest we can set the flag to "static" while keep
> the registry-id field empty still, in this case, the coordinator still
> follows the logic of "dynamic", accepting the request while allowing the
> protocol to be set to "static"; after the first rolling bounce, the group
> protocol is already "static", then a second rolling bounce is triggered and
> this time we set the registry-id.
> 
> 
> Guozhang
> 
> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng  wrote:
> 
>> Guozhang, in a previous message, you proposed said this:
>>
>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang  wrote:
>>>
>>> 1. We bump up the JoinGroupRequest with additional fields:
>>>
>>>  1.a) a flag indicating "static" or "dynamic" membership protocols.
>>>  1.b) with "static" membership, we also add the pre-defined member id.
>>>  1.c) with "static" membership, we also add an optional
>>> "group-change-timeout" value.
>>>
>>> 2. On the broker side, we enforce only one of the two protocols for all
>>> group members: we accept the protocol on the first joined member of the
>>> group, and if later joining members indicate a different membership
>>> protocol, we reject it. If the group-change-timeout value was different
>> to
>>> the first joined member, we reject it as well.
>>
>>
>> What will happen if we have an already-deployed application that wants to
>> switch to using static membership? Let’s say there are 10 instances of it.
>> As the instances go through a rolling restart, they will switch from
>> dynamic membership (the default?) to static membership. As each one leaves
>> the group and restarts, they will be rejected from the group (because the
>> group is currently using dynamic membership). The group will shrink down
>> until there is 1 node handling all the traffic. After that one restarts,
>> the group will switch over to static membership.
>>
>> Is that right? That means that the transition plan from dynamic to static
>> membership isn’t very smooth.
>>
>> I’m not really sure what can be done in this case. This reminds me of the
>> transition plans that were discussed for moving from zookeeper-based
>> consumers to kafka-coordinator-based consumers. That was also hard, and
>> ultimately we decided not to build that.
>>
>> -James
>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-09 Thread Yishun Guan
To add more context for KIP-347: https://github.com/apache/kafka/pull/5353

On Wed, Aug 8, 2018 at 3:55 PM Yishun Guan  wrote:

> Hi all,
>
> I would like to start a discussion on:
>
> KIP-347: Enable batching in FindCoordinatorRequest
> https://cwiki.apache.org/confluence/x/CgZPBQ
>
> Thanks @Guozhang Wang  for his help and patience!
>
> Thanks,
> Yishun
>


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-08-09 Thread Guozhang Wang
Hello Matthias,

Thanks for the updated KIP. Some more comments:

1. The current set of proposed API is a bit too complicated, which makes
the upgrade flow from user's perspective also a bit complex. I'd like to
check different APIs and discuss about their needs separately:

1.a. StoreProxy: needed for in-place upgrade only, between the first
and second rolling bounce, where the old-versioned stores can handle
new-versioned store APIs. I think such upgrade paths (i.e. from one store
type to another) would not be very common: users may want to upgrade from a
certain store engine to another, but the interface would likely be staying
the same. Hence personally I'd suggest we keep it internally and only
consider exposing it in the future if it does become a common pattern.

1.b. ConverterStore / RecordConverter: needed for both in-place and
roll-over upgrade, between the first and second rolling bounces, for the
new versioned store to be able to read old-versioned changelog topics.
Firstly I think we should not expose key in the public APIs but only the
values, since allowing key format changes would break log compaction, and
hence would not be compatible anyways. As for value format changes,
personally I think we can also keep its upgrade logic internally as it may
not worth generalizing to user customizable logic.

1.c. If you agrees with 2.a/b above, then we can also remove "
keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public APIs.

1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is not
needed either given that we are exposing "ValueAndTimestamp" anyways. I.e.
it is just a syntax sugar and for IQ, users can always just set a "
QueryableStoreType>>" as the new
interface does not provide any additional functions.


2. Could we further categorize the upgrade flow for different use cases,
e.g. 1) DSL users where KeyValueWithTimestampStore will be used
automatically for non-windowed aggregate; 2) PAPI users who do not need to
use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to
KeyValueWithTimestampStore. Just to give my understanding for 3), the
upgrade flow for users may be simplified as the following (for both
in-place and roll-over):

* Update the jar to new version, make code changes from KeyValueStore
to KeyValueWithTimestampStore, set upgrade config.

* First rolling bounce, and library code can internally use proxy /
converter based on the specified config to handle new APIs with old stores,
while let new stores read from old changelog data.

* Reset upgrade config.

* Second rolling bounce, and the library code automatically turn off
logic for proxy / converter.


3. Some more detailed proposals are needed for when to recommend users to
trigger the second rolling bounce. I have one idea to share here: we add a
new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade
config is set, and 2) the new stores are still ramping up (for the second
part, we can start with some internal hard-coded heuristics to decide when
it is close to be ramped up). If either one of it is not true any more, it
should transit to RUNNING. Users can then watch on this state, and decide
to only trigger the second rebalance when the state has transited from
UPGRADING. They can also choose to cut over while the instance is still
UPGRADING, the downside is that after that the application may have long
restoration phase which is, to user's pov, unavailability periods.


Below are just some minor things on the wiki:

4. "proxy story" => "proxy store".

5. "use the a builder " => "use a builder"

6: "we add the record timestamp as a 8-byte (long) prefix to the value":
what's the rationale of putting the timestamp before the value, than after
the value?



Guozhang


On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax 
wrote:

> Thanks for the feedback Bill. I just update the KIP with some of your
> points.
>
> >> Regarding step 3C of the in-place upgrade (users needing to watch the
> >> restore process), I'm wondering if we want to provide a type of
> >> StateRestoreListener that could signal when the new stores have reached
> >> parity with the existing old stores and that could be the signal to
> start
> >> second rolling rebalance?
>
> I think we can reuse the existing listeners, thus, I did not include
> anything in the KIP. About a signal to rebalance: this might be tricky.
> If we prepare the store "online", the active task will update the state
> continuously, and thus, state prepare is never finished. It will be the
> users responsibility to do the second rebalance (note, that the second
> rebalance will first finish the last delta of the upgrade to finish the
> upgrade before actual processing resumes). I clarified the KIP with this
> regard a little bit.
>
> >> 1. Out of N instances, one fails midway through the process, would we
> allow
> >> the other instances to complete or just fail the entire upgrade?
>
> The idea is that the upgrade 

Re: [DISCUSS] KIP-350: Allow kafka-topics.sh to take brokerid as parameter to show partitions associated with it

2018-08-09 Thread Ratish Ravindran
Thanks all for the suggestions. I'll work to make changes

Regards,
Ratish

On Sat, Aug 4, 2018 at 2:20 PM Attila Sasvári  wrote:

> Thanks for the KIP.
>
> Regarding the motivation:
> - KIP-113 added a new command, kafka-log-dirs.sh, to get the partitions
> associated with a specific broker (however, it does not work on secured
> clusters, see KIP-340).
>
> It would be probably better/more flexible if you could specify multiple
> broker ids. For example, we could use a command option like --broker-list
> (to be consistent with kafka-log-dirs.sh).
>
> --list only lists the topics. If you want to get the list of partition, you
> would need --describe. I agree with Colin to document error conditions.
>
> Best regards,
> Attila
>
>
>
> On Sat, Aug 4, 2018 at 1:03 AM Colin McCabe  wrote:
>
> > Thanks for the KIP, Ratish.
> >
> > We should probably specify that it is an error when --broker is specified
> > with operations other than --list, right?
> >
> > best,
> > Colin
> >
> >
> > On Wed, Aug 1, 2018, at 21:28, Ratish Ravindran wrote:
> > > Hi,
> > >
> > > I would like to open a discussion thread on KIP-350:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-350%3A+Allow+kafka-topics.sh+to+take+brokerid+as+parameter+to+show+partitions+associated+with+it
> > >
> > > This is KIP is to add the optional brokerid parameter in
> kafka-topics.sh,
> > > to only get the partition associated with that specific broker.
> > Current,ly
> > > we use grep to do so.
> > >
> > > Let me know your thoughts and suggestions.
> > >
> > > Thanks,
> > > Ratish
> >
>


Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-08-09 Thread Stanislav Kozlovski
Hey Jason,

1. *10* is the default value, it says so in the KIP
2. This is a good catch. As the current implementation stands, it's not a
useful metric since the thread continues to run even if all log directories
are offline (although I'm not sure what the broker's behavior is in that
scenario). I'll make sure the thread stops if all log directories are
online.

I don't know which "Needs Discussion" item you're referencing, there hasn't
been any in the KIP since August 1 and that was for the metric only. KIP
History


I've updated the KIP to mention the "time-since-last-run" metric.

Thanks,
Stanislav

On Wed, Aug 8, 2018 at 12:12 AM Jason Gustafson  wrote:

> Hi Stanislav,
>
> Just a couple quick questions:
>
> 1. I may have missed it, but what will be the default value for
> `max.uncleanable.partitions`?
> 2. It seems there will be some impact for users that monitoring
> "time-since-last-run-ms" in order to detect cleaner failures. Not sure it's
> a major concern, but probably worth mentioning in the compatibility
> section. Also, is this still a useful metric after this KIP?
>
> Also, maybe the "Needs Discussion" item can be moved to rejected
> alternatives since you've moved to a vote? I think leaving this for
> potential future work is reasonable.
>
> Thanks,
> Jason
>
>
> On Mon, Aug 6, 2018 at 12:29 PM, Ray Chiang  wrote:
>
> > I'm okay with that.
> >
> > -Ray
> >
> > On 8/6/18 10:59 AM, Colin McCabe wrote:
> >
> >> Perhaps we could start with max.uncleanable.partitions and then
> implement
> >> max.uncleanable.partitions.per.logdir in a follow-up change if it seemed
> >> to be necessary?  What do you think?
> >>
> >> regards,
> >> Colin
> >>
> >>
> >> On Sat, Aug 4, 2018, at 10:53, Stanislav Kozlovski wrote:
> >>
> >>> Hey Ray,
> >>>
> >>> Thanks for the explanation. In regards to the configuration property -
> >>> I'm
> >>> not sure. As long as it has sufficient documentation, I find
> >>> "max.uncleanable.partitions" to be okay. If we were to add the
> >>> distinction
> >>> explicitly, maybe it should be `max.uncleanable.partitions.per.logdir`
> ?
> >>>
> >>> On Thu, Aug 2, 2018 at 7:32 PM Ray Chiang  wrote:
> >>>
> >>> One more thing occurred to me.  Should the configuration property be
>  named "max.uncleanable.partitions.per.disk" instead?
> 
>  -Ray
> 
> 
>  On 8/1/18 9:11 AM, Stanislav Kozlovski wrote:
> 
> > Yes, good catch. Thank you, James!
> >
> > Best,
> > Stanislav
> >
> > On Wed, Aug 1, 2018 at 5:05 PM James Cheng 
> > wrote:
> >
> > Can you update the KIP to say what the default is for
> >> max.uncleanable.partitions?
> >>
> >> -James
> >>
> >> Sent from my iPhone
> >>
> >> On Jul 31, 2018, at 9:56 AM, Stanislav Kozlovski <
> >>>
> >> stanis...@confluent.io>
> 
> > wrote:
> >>
> >>> Hey group,
> >>>
> >>> I am planning on starting a voting thread tomorrow. Please do reply
> >>> if
> >>>
> >> you
> >>
> >>> feel there is anything left to discuss.
> >>>
> >>> Best,
> >>> Stanislav
> >>>
> >>> On Fri, Jul 27, 2018 at 11:05 PM Stanislav Kozlovski <
> >>>
> >> stanis...@confluent.io>
> >>
> >>> wrote:
> >>>
> >>> Hey, Ray
> 
>  Thanks for pointing that out, it's fixed now
> 
>  Best,
>  Stanislav
> 
>  On Fri, Jul 27, 2018 at 9:43 PM Ray Chiang 
> >
>  wrote:
> 
> > Thanks.  Can you fix the link in the "KIPs under discussion" table on
> > the main KIP landing page
> > <
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+
>  Improvement+Proposals#
> 
> > ?
> >>>
>  I tried, but the Wiki won't let me.
> >
> > -Ray
> >
> > On 7/26/18 2:01 PM, Stanislav Kozlovski wrote:
> >> Hey guys,
> >>
> >> @Colin - good point. I added some sentences mentioning recent
> >>
> > improvements
> >
> >> in the introductory section.
> >>
> >> *Disk Failure* - I tend to agree with what Colin said - once a
> >> disk
> >>
> > fails,
> >
> >> you don't want to work with it again. As such, I've changed my
> >> mind
> >>
> > and
> >>
> >>> believe that we should mark the LogDir (assume its a disk) as
> >>
> > offline
> 
> > on
> >>
> >>> the first `IOException` encountered. This is the LogCleaner's
> >>
> > current
> 
> > behavior. We shouldn't change that.
> >>
> >> *Respawning Threads* - I believe we should never re-spawn a
> >> thread.
> >>
> > The
> >>
> >>> correct approach in my mind is to either have it stay dead or 

[jira] [Created] (KAFKA-7268) Review Handling Crypto Rules and update ECCN page if needed

2018-08-09 Thread Henri Yandell (JIRA)
Henri Yandell created KAFKA-7268:


 Summary: Review Handling Crypto Rules and update ECCN page if 
needed
 Key: KAFKA-7268
 URL: https://issues.apache.org/jira/browse/KAFKA-7268
 Project: Kafka
  Issue Type: Task
Reporter: Henri Yandell


It is suggested in LEGAL-358 that Kafka is containing/using cryptographic 
functions and does not have an entry on the ECCN page ( 
[http://www.apache.org/licenses/exports/] ).

See [http://www.apache.org/dev/crypto.html] to review and confirm whether you 
should add something to the ECCN page, and if needed, please do so.

The text in LEGAL-358 was:


[~zznate] added a comment - 18/Jan/18 16:59
[~gregSwilliam] I think [~gshapira_impala_35cc] worked on some of that (def. on 
kafka client SSL stuff) and is on the Kafka PMC. She can probably help. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7267) KafkaStreams Scala DSL process method should accept a ProcessorSupplier

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7267:
---

 Summary: KafkaStreams Scala DSL process method should accept a 
ProcessorSupplier
 Key: KAFKA-7267
 URL: https://issues.apache.org/jira/browse/KAFKA-7267
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


scala.KafkaStreams#process currently expects a ()=>Processor, which is 
semantically equivalent to a ProcessorSupplier, but it's the only such method 
to do so.

All the similar methods in the Scala DSL take a Supplier like their Java 
counterparts.

Note that on Scala 2.12+, SAM conversion allows callers to pass either a 
function or a supplier when the parameter is a ProcessorSupplier. (But if the 
parameter is a function, you must pass a function)

But on scala 2.11-, callers will have to pass a function if the parameter is a 
function and a supplier if the parameter is a supplier. This means that 
currently, 2.11 users are confronted with an api that demands they construct 
suppliers for all the methods *except* process, which demands a function.

Mitigating factor: we have some implicits available to convert a Function0 to a 
supplier, and we could add an implicit from ProcessorSupplier to Function0 to 
smooth over the API.

 

What to do about it?

We could just change the existing method to take a ProcessorSupplier instead.
 * 2.12+ users would not notice a difference during compilation, as SAM 
conversion would kick in. However, if they just swap in the new jar without 
recompiling, I think they'd get a MethodDefNotFound error.
 * 2.11- users would not be able to compile their existing code. They'd have to 
swap their function out for a ProcessorSupplier or pull the implicit conversion 
into scope.
 * Note that we can delay this action until we drop 2.11 support, and we would 
break no one.

We could deprecate the existing method and add a new one taking a 
ProcessorSupplier.
 * All scala users would be able to compile their existing code and also swap 
in the new version at runtime.
 * Anyone explicitly passing a function would get a deprecation warning, 
though, regardless of SAM conversion or implicit conversion, since neither 
conversion won't kick in if there's actually a method overload expecting a 
function. This would drive everyone to explicitly create a supplier 
(unnecessarily)

We could leave the existing method without deprecating it and add a new one 
taking a ProcessorSupplier.
 * All scala users would be able to compile their existing code and also swap 
in the new version at runtime.
 * There would be no unfortunate deprecation warnings.
 * The interface would list two process methods, which is untidy.
 * Once we drop 2.11 support, we would just drop the function variant.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Kafka stream - Internal topic name and schema avro compatibility

2018-08-09 Thread Adam Bellemare
Hi Cédric

1. You should be able to rebuild your internal state after doing a full
reset (Deleting all internal topics). The application will just need to
consume data from the beginning of the input streams and rebuild the state
accordingly. If you don't want to lose the state, or if it is too expensive
to rebuild the entire application, you may wish to look into external
storage options instead. In my experience, this begins to get more
complicated though, so I prefer to just rebuild the state.

2. I use Scala for my Kafka Streams work, in particular this very helpful
library which converts case classes into Avro format and back:
https://github.com/sksamuel/avro4s

In my solution, I only send the encoded Avro data but not the schema. It is
up to the developer of the application to know which data is being stored
in which internal topic. This proves to be very easy since I'm only ever
handling Plain-Ol-Java/Scala-Objects in the Kafka Streams DSL.
I should add that you are not required to use any schema for internal
topics, but I much prefer to anyways because I find it the simplest and
least error prone out of all my options.
In terms of performance, there is no difference from that of using the
schema registry ones since the process is the same.

Hope this helps a bit.

Adam






On Thu, Aug 9, 2018 at 4:33 AM, Cedric BERTRAND <
bertrandcedric@gmail.com> wrote:

> Thanks John and Adam for your answer,
>
> After investigation, I am exactly in the case you describe John.
> After a modification in my toplogy, a processor KEY-SELECT get the same
> number of an old processor KEY-SELECT with the associated repartition
> topic.
> We use the app reset tool to clean all internal topic but the tool doesn't
> clean the schema registry.
>
> In see, 2 solutions to solve this problem when it occured.
>
> 1. Clean all internal topic and subjects in schema registry
> The problem with this solution is that I also clean internal-changelog
> topic. Sometime I don't want to loose this internal state.
>
> 2. I don't use schema registry for internal topic (the solution exposed by
> Adam)
> Without schema registry, do I send all the object (data + schema avro) into
> Kafka ?
> What about performance with the solution ?
>
>
> The solution to give an explicit name to all operator seam to be
> interesting to solve this problem.
>
> I found this KIP which propose to implement this solution.
>
> KIP-307: Allow to define custom processor names with KStreams DSL
>  307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL>
>
> I know that the probalilty a KEY-SELECT node get the same number than an
> old one is very low.
> But when it occured, it's extremely hard to understand.
>
> Thanks for your time,
>
> Cédric
>
>
>
>
>
> Le mer. 8 août 2018 à 22:34, John Roesler  a écrit :
>
> > Hi Cédric,
> >
> > The suffix is generated when we build the topology in such a way to
> > guarantee each node/interna-topic/state-store gets a unique name.
> >
> > Generally speaking, it is unsafe to modify the topology and restart it.
> We
> > recommend using the app reset tool whenever you update your topology.
> >
> > That said, some changes to the topology might be safe, so your mileage
> may
> > vary; just be aware that changing the topology in place will potentially
> > produce corrupt data.
> >
> > The main example I'd give is if you were to restructure your topology and
> > you wind up with some other node type, like a "KSTREAM-TRANSFORM-"
> getting
> > number 99, then you won't have a problem. The new node will create
> whatever
> > internal state/topics are needed with a non-colliding name. But if you
> > restructured the topology and a *different* key select happened to get
> > number 99, then you'd have a big problem. Streams would have no idea that
> > the existing repartition topic was for a different key select; it would
> > just start using the existing topic. But this means that the repartition
> > topic would be half one set of data and half another. Clearly, this is
> not
> > good.
> >
> > It sounds to me like this is maybe what happened to you.
> >
> > We have been discussing various mechanisms by which we could support
> > modifying the topology in place. Typically, this would involve giving
> each
> > operator a semantic name so that the internal names would be related to
> > what the nodes are doing, not the order in which the nodes are created.
> >
> > At the very least, we'd like to have some way of detecting that the
> > topology has changed during a restart and refusing to start up, to
> protect
> > the integrity of your data.
> >
> > I hope this helps,
> > -John
> >
> > On Wed, Aug 8, 2018 at 12:51 PM Adam Bellemare  >
> > wrote:
> >
> > > Hi Cédric
> > >
> > > I do not know how the topology names are chosen, but provided that you
> > > didn't change any of the topology then new topics will not be created
> or
> > > require alteration.
> > >
> > > If you 

[jira] [Created] (KAFKA-7266) Fix MetricsTest test flakiness

2018-08-09 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7266:
--

 Summary: Fix MetricsTest test flakiness
 Key: KAFKA-7266
 URL: https://issues.apache.org/jira/browse/KAFKA-7266
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski
 Fix For: 2.0.1, 2.1.0


The test `kafka.api.MetricsTest.testMetrics` has been failing intermittently in 
kafka builds (recent proof: 
https://github.com/apache/kafka/pull/5436#issuecomment-409683955)
The particular failure is in the `MessageConversionsTimeMs` metric assertion -
{code}
java.lang.AssertionError: Message conversion time not recorded 0.0
{code}

There has been work done previously (https://github.com/apache/kafka/pull/4681) 
to combat the flakiness of the test and while it has improved it, the test 
still fails sometimes.

h3. Solution
On my machine, the test failed 5 times out of 25 runs. I suspect the solution 
would be to increase the record batch size to ensure the conversion takes more 
than 1ms time so as to be recorded by the metric. Increasing the maximum batch 
size from 1MB to 8MB made the test fail locally once out of 100 times. Setting 
it to 16MBs seems to fix the problem. I've ran 300 runs and have not seen a 
failure with 16MBs set as the batch size



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7265) Broker and consumer metrics topic names are reported differently

2018-08-09 Thread Mathias Kub (JIRA)
Mathias Kub created KAFKA-7265:
--

 Summary: Broker and consumer metrics topic names are reported 
differently
 Key: KAFKA-7265
 URL: https://issues.apache.org/jira/browse/KAFKA-7265
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.1.0
Reporter: Mathias Kub


Hi,

I noticed that the *topic* attribute value of 
kafka.consumer:consumer-fetch-manager-metrics reported differently than for 
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec.

For kafka.consumer:consumer-fetch-manager-metrics *.* (dots) are replaced by 
*_* (underscores), but this is not true for e.g. 
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec.

I am currently using the metrics for monitoring, extracting *topic* of 
kafka.consumer:consumer-fetch-manager-metrics and displaying 
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec for the given topic 
which does not work in that case.

Please also see https://github.com/apache/kafka/pull/4362#discussion_r203435388.

Thanks,
Mathias



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-280: Enhanced log compaction

2018-08-09 Thread Luís Cabral
Hi,


So, after a "short" break, I've finally managed to find time to resume this 
KIP. Sorry to all for the delay.

Continuing the conversation of the configurations being global vs  topic, I've 
checked this and it seems that they are only available globally.

This configuration is passed to the log cleaner via "CleanerConfig.scala", 
which only accepts global configurations. This seems intentional, as the log 
cleaner is not mutable and doesn't get instantiated that often. I think that 
changing this to accept per-topic configuration would be very nice, but perhaps 
as a part of a different KIP.


Following the Kafka documentation, these are the settings I'm referring to:

-- --

    Updating Log Cleaner Configs

    Log cleaner configs may be updated dynamically at cluster-default level 
used by all brokers. The changes take effect on the next iteration of log 
cleaning. One or more of these configs may be updated:

        * log.cleaner.threads

        * log.cleaner.io.max.bytes.per.second

        * log.cleaner.dedupe.buffer.size

        * log.cleaner.io.buffer.size

        * log.cleaner.io.buffer.load.factor

        * log.cleaner.backoff.ms

-- --



Please feel free to confirm, otherwise I will update the KIP to reflect these 
configuration nuances in the next few days.


Best Regards,

Luis



On Monday, July 9, 2018, 1:57:38 PM GMT+2, Andras Beni 
 wrote: 





Hi Luís,

Can you please clarify how the header value has to be encoded in case log
compaction strategy is 'header'. As I see current PR reads varLong in
CleanerCache.extractVersion and read String and uses toLong in
Cleaner.extractVersion while the KIP says no more than 'the header value
(which must be of type "long")'.

Otherwise +1 for the KIP

As for current implementation: it seems in Cleaner class header key
"version" is hardwired.

Andras



On Fri, Jul 6, 2018 at 10:36 PM Jun Rao  wrote:

> Hi, Guozhang,
>
> For #4, what you suggested could make sense for timestamp based de-dup, but
> not sure how general it is since the KIP also supports de-dup based on
> header.
>
> Thanks,
>
> Jun
>
> On Fri, Jul 6, 2018 at 1:12 PM, Guozhang Wang  wrote:
>
> > Hello Jun,
> > Thanks for your feedbacks. I'd agree on #3 that it's worth adding a
> special
> > check to not delete the last message, since although unlikely, it is
> still
> > possible that a new active segment gets rolled out but contains no data
> > yet, and hence the actual last message in this case would be in a
> > "compact-able" segment.
> >
> > For the second part of #4 you raised, maybe we could educate users to
> set "
> > message.timestamp.difference.max.ms" to be no larger than "
> > log.cleaner.delete.retention.ms" (its default value is Long.MAX_VALUE)?
> A
> > more aggressive approach would be changing the default value of the
> former
> > to be the value of the latter if:
> >
> > 1. cleanup.policy = compact OR compact,delete
> > 2. log.cleaner.compaction.strategy != offset
> >
> > Because in this case I think it makes sense to really allow users send
> any
> > data longer than "log.cleaner.delete.retention.ms", WDYT?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jul 6, 2018 at 11:51 AM, Jun Rao  wrote:
> >
> > > Hi, Luis,
> > >
> > > 1. The cleaning policy is configurable at both global and topic level.
> > The
> > > global one has the name log.cleanup.policy and the topic level has the
> > name
> > > cleanup.policy by just stripping the log prefix. We can probably do the
> > > same for the new configs.
> > >
> > > 2. Since this KIP may require an admin to configure a larger dedup
> buffer
> > > size, it would be useful to document this impact in the wiki and the
> > > release notes.
> > >
> > > 3. Yes, it's unlikely for the last message to be removed in the current
> > > implementation since we never clean the active segment. However, in
> > theory,
> > > this can happen. So it would be useful to guard this explicitly.
> > >
> > > 4. Just thought about another issue. We probably want to be a bit
> careful
> > > with key deletion. Currently, one can delete a key by sending a message
> > > with a delete tombstone (a null payload). To prevent a reader from
> > missing
> > > a deletion if it's removed too quickly, we depend on a configuration
> > > log.cleaner.delete.retention.ms (defaults to 1 day). The delete
> > tombstone
> > > will only be physically removed from the log after that amount of time.
> > The
> > > expectation is that a reader should finish reading to the end of the
> log
> > > after consuming a message within that configured time. With the new
> > > strategy, we have similar, but slightly different problems. The first
> > > problem is that the delete tombstone may be delivered earlier than an
> > > outdated record in offset order to a consumer. In order for the
> consumer
> > > not to take the outdated record, the consumer should cache the deletion
> > > tombstone for some configured amount of time. We ca probably piggyback
> > this
> > > on 

Re: [DISCUSS] KIP-342 Add Customizable SASL extensions to OAuthBearer authentication

2018-08-09 Thread Rajini Sivaram
Hi Ron/Stansilav,

OK, let's just go with 2. I think it would be better to add a
OAuth-specific extensions handler OAuthBearerExtensionsValidatorCallback that
provides OAuthBearerToken.

To summarise, we chose option 2 out of these four options:

   1. {OAuthBearerValidatorCallback, SaslExtensionsValidatorCallback} : We
   don't want to use multiple ordered callbacks since we don't want the
   context of one callback to come from another.callback,
   2. OAuthBearerExtensionsValidatorCallback(OAuthBearerToken token,
   SaslExtensions ext): This allows extensions to be validated using
   context from the token, we are ok with this.
   3. SaslExtensionsValidatorCallback(Map context,
   SaslExtensions ext): This doesn't really offer any real advantage over 2.
   4. OAuthBearerValidatorCallback(String token, SaslExtensions ext): We
   don't want token validator to see extensions since these are insecure but
   token validation needs to be secure. So we prefer to use a second callback
   handler to validate extensions after securely validating token.



On Wed, Aug 8, 2018 at 8:52 PM, Ron Dagostino  wrote:

> Hi Rajini.  I think we are considering the following two options.  Let me
> try to describe them along with their relative advantages/disadvantages.
>
> Option #1: Send two callbacks in a single array to the callback handler:
> ch.handle(new Callback[] {tokenCallback, extensionsCallback});
>
> Option #2: Send two callbacks separately, in two separate arrays, to the
> callback handler:
> ch.handle(new Callback[] {tokenCallback});
> ch.handle(new Callback[] {extensionsCallback});
>
> I actually don't see any objective disadvantage with #1.  If we don't get
> an exception then we know we have the information we need; if we do get an
> exception then we can tell if the first callback was handled because either
> its token() method or its errorStatus() method will return non-null; if
> both return null then we just send the token callback by itself and we
> don't publish any extension as negotiated properties.  There is no
> possibility of partial results, and I don't think there is a performance
> penalty due to potential re-validation here, either.
>
> I  see a subjective disadvantage with #1.  It feels awkward to me to
> provide the token as context for extension validation via the first
> callback.
>
> Actually, it just occurred to me why it feels awkward, and I think this is
> an objective disadvantage of this approach.  It would be impossible to do
> extension validation in such a scenario without also doing token validation
> first.  We are using the first callback as a way to provide context, but we
> are also using that first callback to request token validation.  We are
> complecting two separate things -- context and a request for validation --
> into one thing, so this approach has an element of complexity to it.
>
> The second option has no such complexity.  If we want to provide context to
> the extension validation then we do that by adding a token to the
> extensionsCallback instance before we provide it to the callback handler.
> How we do that -- whether by Map or via a typed getter --
> feels like a subjective decision, and assuming you agree with the
> complexity argument and choose option #2, I would defer to your preference
> as to how to implement it.
>
> Ron
>
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Aug 8, 2018 at 3:10 PM Rajini Sivaram 
> wrote:
>
> > Hi Ron,
> >
> > Yes, I was thinking of a SaslExtensionsValidatorCallback with additional
> > context as well initially, but I didn't like the idea of name-value pairs
> > and I didn't want generic  objects passed around through the callback  So
> > providing context through other callbacks felt like a neater fit. There
> > are pros and cons for both approaches, so we could go with either.
> >
> > Callbacks are provided to callback handlers in an array and there is
> > implicit ordering in the callbacks provided to the callback handler.
> > In the typical example of {NameCallback, PasswordCallback}, we expect
> that
> > ordering so that password callback knows what the user name is. Kafka
> > guarantees ordering of server callbacks in each of its SASL mechanisms
> and
> > this is explicitly stated in
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 86%3A+Configurable+SASL+callback+handlers
> > .
> > Until now, we didn't need to worry about ordering for OAuth.
> >
> > We currently do not have any optional callbacks - configured callback
> > handlers have to process all the callbacks for the mechanism or else we
> > fail authentication. We have to make SaslExtensionValidationCallback an
> > exception, at least for backward compatibility. We will only include this
> > callback if the client provided some extensions. I think it is reasonable
> > to expect that in general, custom callback handlers will handle this
> > callback if clients are likely to set extensions.  In case it doesn't, we
> > don't want to make any 

Build failed in Jenkins: kafka-trunk-jdk8 #2881

2018-08-09 Thread Apache Jenkins Server
See 


Changes:

[mjsax] MINOR: Fixed log in Topology Builder. (#5477)

--
[...truncated 876.86 KB...]
kafka.zookeeper.ZooKeeperClientTest > testExistsNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testConnectionTimeout STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnectionTimeout PASSED

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler STARTED

kafka.zookeeper.ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler PASSED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString STARTED

kafka.zookeeper.ZooKeeperClientTest > testUnresolvableConnectString PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData STARTED

kafka.zookeeper.ZooKeeperClientTest > testPipelinedGetData PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChildChangeHandlerForChildChange 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNodeWithChildren 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline STARTED

kafka.zookeeper.ZooKeeperClientTest > testMixedPipeline PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetDataExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry STARTED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiry PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetDataNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testDeleteNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testExistsExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperStateChangeRateMetrics PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDeletion PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testStateChangeHandlerForAuthFailure 
PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED


Re: Kafka stream - Internal topic name and schema avro compatibility

2018-08-09 Thread Cedric BERTRAND
Thanks John and Adam for your answer,

After investigation, I am exactly in the case you describe John.
After a modification in my toplogy, a processor KEY-SELECT get the same
number of an old processor KEY-SELECT with the associated repartition topic.
We use the app reset tool to clean all internal topic but the tool doesn't
clean the schema registry.

In see, 2 solutions to solve this problem when it occured.

1. Clean all internal topic and subjects in schema registry
The problem with this solution is that I also clean internal-changelog
topic. Sometime I don't want to loose this internal state.

2. I don't use schema registry for internal topic (the solution exposed by
Adam)
Without schema registry, do I send all the object (data + schema avro) into
Kafka ?
What about performance with the solution ?


The solution to give an explicit name to all operator seam to be
interesting to solve this problem.

I found this KIP which propose to implement this solution.

KIP-307: Allow to define custom processor names with KStreams DSL


I know that the probalilty a KEY-SELECT node get the same number than an
old one is very low.
But when it occured, it's extremely hard to understand.

Thanks for your time,

Cédric





Le mer. 8 août 2018 à 22:34, John Roesler  a écrit :

> Hi Cédric,
>
> The suffix is generated when we build the topology in such a way to
> guarantee each node/interna-topic/state-store gets a unique name.
>
> Generally speaking, it is unsafe to modify the topology and restart it. We
> recommend using the app reset tool whenever you update your topology.
>
> That said, some changes to the topology might be safe, so your mileage may
> vary; just be aware that changing the topology in place will potentially
> produce corrupt data.
>
> The main example I'd give is if you were to restructure your topology and
> you wind up with some other node type, like a "KSTREAM-TRANSFORM-" getting
> number 99, then you won't have a problem. The new node will create whatever
> internal state/topics are needed with a non-colliding name. But if you
> restructured the topology and a *different* key select happened to get
> number 99, then you'd have a big problem. Streams would have no idea that
> the existing repartition topic was for a different key select; it would
> just start using the existing topic. But this means that the repartition
> topic would be half one set of data and half another. Clearly, this is not
> good.
>
> It sounds to me like this is maybe what happened to you.
>
> We have been discussing various mechanisms by which we could support
> modifying the topology in place. Typically, this would involve giving each
> operator a semantic name so that the internal names would be related to
> what the nodes are doing, not the order in which the nodes are created.
>
> At the very least, we'd like to have some way of detecting that the
> topology has changed during a restart and refusing to start up, to protect
> the integrity of your data.
>
> I hope this helps,
> -John
>
> On Wed, Aug 8, 2018 at 12:51 PM Adam Bellemare 
> wrote:
>
> > Hi Cédric
> >
> > I do not know how the topology names are chosen, but provided that you
> > didn't change any of the topology then new topics will not be created or
> > require alteration.
> >
> > If you modify the topology then the naming can indeed change, but it
> would
> > then create a new internal topic and there would be no compatibility
> issue.
> > It could very well be that your topology was modified in such a way that
> > another, different internal topic is attempting to register an
> incompatible
> > schema. In this case though, I would expect that the error information
> > returned from the schema registry registration process to highlight
> exactly
> > what the failure is. It has been a while since we run into one of these
> so
> > I could be wrong on that front though.
> >
> > My recommendation to you is to create a simple "InternalSerde" for your
> > Avro classes used in internal topics, such that you do *not* register
> them
> > to the schema registry. I have found that registering internal topics to
> > the schema registry turns it into a garbage dump and prevents developers
> > from making independent changes to their internal schemas. The rule of
> > thumb we use is that we only register schemas to the schema registry when
> > the events leave the application's bounded context - ie: final output
> > events only.
> >
> > Hope this helps,
> >
> > Adam
> >
> >
> >
> >
> >
> > On Wed, Aug 8, 2018 at 11:14 AM, Cedric BERTRAND <
> > bertrandcedric@gmail.com> wrote:
> >
> > > Within the Kafka Stream topology, internal topic are created.
> > > For this internal topics, schema avro for key and value are registered
> > into
> > > schema registry.
> > >
> > > For the topic
> internal-MYAPPS-KSTREAM-KEY-SELECT-99-repartition,
> > I
> 

Jenkins build is back to normal : kafka-trunk-jdk10 #389

2018-08-09 Thread Apache Jenkins Server
See