Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Guozhang Wang
Hi Jason,

Thanks for the written KIP. I think this is going to be a very useful tool
for operational improvements since with eos in its current stage, we cannot
confidently assert that we are bug-free, and even in the future when we are
confident this is still going to be leveraged by older versioned brokers.
Regarding the solution, I've also debated myself whether Kafka should
"self-heal" automatically when detected in such situations, or should we
instead build into ecosystem tooling to let operators do it. And I've also
convinced myself that the latter should be a better solution to keep Kafka
software itself simpler.

Regarding the KIP itself, I have a few meta comments below:

1. I'd like to clarify how we can make "--abort" work with old brokers,
since without the additional field "Partitions" the tool needs to set the
coordinator epoch correctly instead of "-1"? Arguably that's still doable
but would require different call paths, and it's not clear whether that's
worth doing for old versions.

2. Why do we have to enforce "DescribeProducers" to be sent to only leaders
while ListTransactions can be sent to any brokers? Or is it really
"ListTransactions to be sent to coordinators only"? From the workflow
you've described, based on the results back from DescribeProducers, we
should just immediately send ListTransactions to the
corresponding coordinators based on the collected producer ids, instead of
trying to send to any brokers right?

Also I'm a bit concerned if "ListTransactions" could potentially return too
much data with "StateFilters" set to all states, including completed ones.
Do we expect users ever want to know transactions that are not pending? On
the other hand, maybe we can just require users to specify the "pids[]" in
this request too to further filter those un-interested transactions. This
also works well with the workflow: we know exactly from "DescribeProducers"
which pids are we diagnosing right now, so in the follow-up
"ListTransactions" we should also only care for those partitions only.

3. One thing I'm a bit hesitant about is that, is `Describe` permission on
the associated topic sufficient to allow any users to get all producer
information writing to the specific topic-partitions including last
timestamp, txn-start-timestamp etc, which may be considered sensitive?
Should we require "ClusterAction" to only allow operators only?

Below are more detailed comments:

4. From the example it seems "TxnStartOffset" should be included in the
DescribeTransaction response schema? Otherwise the user would not get it in
the following WriteTxnMarker request.

5. It is a bit easier for readers to highlight the added fields in the
existing WriteTxnMarkerRequest (btw I read is that we are only adding
"Partitions" with the starting offset, right?). Also as for its response it
seems we do not make any schema changes except adding one more potential
error code "INVALID_TXN_STATE" to it, right? If that's the case we can just
state that explicitly.

6. It is not clear to me for the overloaded function that the following
option classes are not specified, what should be the default options?

* ListTransactionsOptions
* DescribeTransactionsOptions
* DescribeProducersOptions

Also, it seems AbortTransactionOptions would just be empty? If yes do we
really need this option class for now?

7. A couple questions from the cmd tool examples:
7.1 Is "--broker" a required or optional (in that case I presume we would
just query all brokers iteratively) in "--find-hanging"?
7.2 Seems "list-producers" is not exposed as a standalone feature in the
cmd but only used in the wrapping "--find-hanging", is that intentional?
Personally I feel exposing a "--list-producers" may be useful too: if we
believe the user has the right ACL, it is legitimate to return the producer
information to her anyways. But that is debatable in the meta point 3)
above.
7.3 "Describing Transactions": we should also explain how that would be
executed, e.g. at least we should clarify that we would first find the
coordinator based on the transactional.id and hence users do not need to
specify one.
7.4. In "Aborting Transactions", should we also specify the "--broker" node
as a required option? Otherwise we would not know which broker to send to.


Overall, nice written one, thanks Jason.

Guozhang


On Thu, Aug 27, 2020 at 11:44 AM Lucas Bradstreet 
wrote:

> >> Would it be worth returning transactional.id.expiration.ms in the
> DescribeProducersResponse?
>
> > That's an interesting thought as well. Are you trying to avoid the need
> to
> specify it through the command line? The tool could also query the value
> with DescribeConfigs I suppose.
>
> Basically. I'm not sure how useful this will be in practice, though it
> might help when debugging.
>
> Lucas
>
> On Thu, Aug 27, 2020 at 11:00 AM Jason Gustafson 
> wrote:
>
> > Hey Lucas,
> >
> > Thanks for the comments. Responses below:
> >
> > > Given that it's possible for replica producer states 

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

2020-08-27 Thread Mickael Maison
Thanks Ning for the KIP. Having stronger guarantees when mirroring
data would be a nice improvement!

A few comments:
1. How does offset translation work with this new sink connector?
Should we also include a CheckpointSinkConnector?

2. Migrating to this new connector could be tricky as effectively the
Connect runtime needs to point to the other cluster, so its state
(stored in the __connect topics) is lost. Unfortunately there is no
easy way today to prime Connect with offsets. Not necessarily a
blocking issue but this should be described as I think the current
Migration section looks really optimistic at the moment

3. We can probably find a better name than "transaction.producer".
Maybe we can follow a similar pattern than Streams (which uses
"processing.guarantee")?

4. Transactional Ids used by the producer are generated based on the
task assignments. If there's a single task, if it crashes and restarts
it would still get the same id. Can this be an issue?

5. The logic in the KIP creates a new transaction every time put() is
called. Is there a performance impact?

On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan  wrote:
>
> Awesome, this will be a huge advancement. I also want to point out that
> this KIP implements MirrorSinkConnector as well, finally, which is a very
> often requested missing feature in my experience.
>
> Ryanne
>
> On Fri, Aug 21, 2020, 9:45 AM Ning Zhang  wrote:
>
> > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > At the high-level, it resembles the idea of how HDFS Sink Connector
> > achieves EOS across clusters by managing and storing the consumer offsets
> > in an external persistent storage, but also leverages the current Kafka EOS
> > guarantee within a single cluster. I have done some experiments especially
> > for the failure cases and I am very appreciated for comments and feedback
> > on this KIP from bigger audience.
> >


[jira] [Created] (KAFKA-10441) Connect consumers are missing the fetch metrics

2020-08-27 Thread Pradyumna (Jira)
Pradyumna created KAFKA-10441:
-

 Summary: Connect consumers are missing the fetch metrics
 Key: KAFKA-10441
 URL: https://issues.apache.org/jira/browse/KAFKA-10441
 Project: Kafka
  Issue Type: Bug
  Components: consumer, KafkaConnect
Affects Versions: 2.3.0
Reporter: Pradyumna


Kafka connect workers create KafkaConsumers and consume from Kafka topics.

 

However, unlike regular KafkaConsumers which have a good set of metrics in the 
group consumer-fetch-manager-metrics (such as records-lag, 
records-consumed-total etc), these KafkaConsumers created by Kafka Connect do 
not have any of these consumer metrics.

 

I took a heap dump and saw that the "sensors" field in the metrics field of the 
KafkaConsumer objects created by Kafka Connect had only the connect metrics, 
and not the consumer fetch metrics.

 

I tried briefly to find out where in the Kafka code is the metrics object being 
reset or the sensors field of the metrics object overridden by kafka connect, 
but I could not locate anything yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10440) MM2 using Kconnect Framework Creates topics recursively on source cluster

2020-08-27 Thread Nandini Anagondi (Jira)
Nandini Anagondi created KAFKA-10440:


 Summary: MM2 using Kconnect Framework Creates topics recursively 
on source cluster
 Key: KAFKA-10440
 URL: https://issues.apache.org/jira/browse/KAFKA-10440
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Nandini Anagondi
 Fix For: 2.5.0
 Attachments: MirrorSourceConnector-config.json, 
MirrorSourceConnector-override-config.json, list_of_topics.txt

I found an issue while running MM2 using Kconnect Framework. Based on the 
configuration provided in 
[this|[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java]]

*source : localhost:9092, Alias : A*

*target : localhost:9093, Alias : B*

*Issue Faced:*

Topics from the source cluster are created at target cluster but instead of 
producing data of source topics to target it is producing to source cluster 
topics and those topics are getting created at source because the 
*allowAutoTopicCreation=true.*

*How the issue caused:* 

Kconnect is running on source cluster and connectors are configured from A->B. 
Connect framework uses connect-distributed.properties to initialise 
producerConfig, consumerConfig. So, producer and consumer are initialised with 
source bootstrap servers i.e., with 
localhost:9092("source.cluster.producer.bootstrap.servers" : "localhost:9092”, 
"source.cluster.consumer.bootstrap.servers" : "localhost:9092”).

This configuration works perfectly fine with all traditional connectors because 
they produce and consume from that cluster itself. But MM2 connectors are 
contrary to this. Let's see this example

Ideally MM2 should work like this

 A                                               B

test-mm2                             A.test-mm2 

Indeed it is working like this and creating topics recursively. Please refer to 
the attachments.

 A                                               B

A.test-mm2                         A.A.test-mm2

test-mm2                             A.test-mm2 

 

This applies to all the topics where data need to be copied. Suppose if it 
doesn't have any data it won't cause this issue. 

*Fixing the issue:*

MirrorSourceConnector should produce to destination cluster topics while 
copying data. This issue can be fixed by overriding bootstrap.servers. To do 
this connector.client.config.override.policy should be set to "All" in 
connect-distributed.properties &  (producer.override.bootstrap.servers, 
consumer.override.bootstrap.servers) should match with 
target.cluster.bootstrap.servers and this need to be provided in connectors 
payload.So, connectors now produce the data to target cluster.

A                                            B

test-mm2                           A.test-mm2 

 

List of the topics in both cases: [^list_of_topics.txt]

You can find the config in both cases here:

[^MirrorSourceConnector-config.json] - Config that matches with 
MirrorConnectorConfig

[^MirrorSourceConnector-override-config.json]Config that works

 

To imitate the scenario: Start the kconnect at source cluster,  Configure 
connector with MirrorSourceConnector-config.json, create a topic and produce 
data to it. Check the list of topics in 5-10seconds.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-635: GetOffsetShell: support for multiple topics and consumer configuration override

2020-08-27 Thread Dániel Urbán
Hi all,

Please vote if you'd like to see this implemented. This one fixes a
long-time debt, would be nice to see it pass.

Thank you,
Daniel

Dániel Urbán  ezt írta (időpont: 2020. aug. 18., K,
14:06):

> Hello everyone,
>
> Please, if you are interested in this KIP and PR, don't forget to vote.
>
> Thank you,
> Daniel
>
> Dániel Urbán  ezt írta (időpont: 2020. aug. 13.,
> Cs, 14:00):
>
>> Hi David,
>>
>> Thank you for the suggestion. KIP-635 was referencing the --broker-list
>> issue, but based on your suggestion, I pinged the PR
>> https://github.com/apache/kafka/pull/8123.
>> Since I got no response, I updated KIP-635 to deprecate --broker-list.
>> Will update the PR related to KIP-635 to reflect that change.
>>
>> Thanks,
>> Daniel
>>
>> David Jacot  ezt írta (időpont: 2020. aug. 10., H,
>> 20:48):
>>
>>> Hi Daniel,
>>>
>>> I was not aware of that PR. At minimum, I would add `--bootstrap-server`
>>> to the list in the KIP for completeness. Regarding the implementation,
>>> I would leave a comment in that PR asking if they plan to continue it. If
>>> not,
>>> we could do it as part of your PR directly.
>>>
>>> Cheers,
>>> David
>>>
>>> On Mon, Aug 10, 2020 at 10:49 AM Dániel Urbán 
>>> wrote:
>>>
>>> > Hi everyone,
>>> >
>>> > Just a reminder, please vote if you are interested in this KIP being
>>> > implemented.
>>> >
>>> > Thanks,
>>> > Daniel
>>> >
>>> > Dániel Urbán  ezt írta (időpont: 2020. júl.
>>> 31., P,
>>> > 9:01):
>>> >
>>> > > Hi David,
>>> > >
>>> > > There is another PR linked on KAFKA-8507, which is still open:
>>> > > https://github.com/apache/kafka/pull/8123
>>> > > Wasn't sure if it will go in, and wanted to avoid conflicts. Do you
>>> think
>>> > > I should do the switch to '--bootstrap-server' anyway?
>>> > >
>>> > > Thanks,
>>> > > Daniel
>>> > >
>>> > > David Jacot  ezt írta (időpont: 2020. júl.
>>> 30., Cs,
>>> > > 17:52):
>>> > >
>>> > >> Hi Daniel,
>>> > >>
>>> > >> Thanks for the KIP.
>>> > >>
>>> > >> It seems that we have forgotten to include this tool in KIP-499.
>>> > >> KAFKA-8507
>>> > >> is resolved
>>> > >> by this tool still uses the deprecated "--broker-list". I suggest to
>>> > >> include "--bootstrap-server"
>>> > >> in your public interfaces as well and fix this omission during the
>>> > >> implementation.
>>> > >>
>>> > >> +1 (non-binding)
>>> > >>
>>> > >> Thanks,
>>> > >> David
>>> > >>
>>> > >> On Thu, Jul 30, 2020 at 1:52 PM Kamal Chandraprakash <
>>> > >> kamal.chandraprak...@gmail.com> wrote:
>>> > >>
>>> > >> > +1 (non-binding), thanks for the KIP!
>>> > >> >
>>> > >> > On Thu, Jul 30, 2020 at 3:31 PM Manikumar <
>>> manikumar.re...@gmail.com>
>>> > >> > wrote:
>>> > >> >
>>> > >> > > +1 (binding)
>>> > >> > >
>>> > >> > > Thanks for the KIP!
>>> > >> > >
>>> > >> > >
>>> > >> > >
>>> > >> > > On Thu, Jul 30, 2020 at 3:07 PM Dániel Urbán <
>>> urb.dani...@gmail.com
>>> > >
>>> > >> > > wrote:
>>> > >> > >
>>> > >> > > > Hi everyone,
>>> > >> > > >
>>> > >> > > > If you are interested in this KIP, please do not forget to
>>> vote.
>>> > >> > > >
>>> > >> > > > Thanks,
>>> > >> > > > Daniel
>>> > >> > > >
>>> > >> > > > Viktor Somogyi-Vass  ezt írta
>>> (időpont:
>>> > >> 2020.
>>> > >> > > > júl.
>>> > >> > > > 28., K, 16:06):
>>> > >> > > >
>>> > >> > > > > +1 from me (non-binding), thanks for the KIP.
>>> > >> > > > >
>>> > >> > > > > On Mon, Jul 27, 2020 at 10:02 AM Dániel Urbán <
>>> > >> urb.dani...@gmail.com
>>> > >> > >
>>> > >> > > > > wrote:
>>> > >> > > > >
>>> > >> > > > > > Hello everyone,
>>> > >> > > > > >
>>> > >> > > > > > I'd like to start a vote on KIP-635. The KIP enhances the
>>> > >> > > > GetOffsetShell
>>> > >> > > > > > tool by enabling querying multiple topic-partitions,
>>> adding
>>> > new
>>> > >> > > > filtering
>>> > >> > > > > > options, and adding a config override option.
>>> > >> > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override
>>> > >> > > > > >
>>> > >> > > > > > The original discussion thread was named "[DISCUSS]
>>> KIP-308:
>>> > >> > > > > > GetOffsetShell: new KafkaConsumer API, support for
>>> multiple
>>> > >> topics,
>>> > >> > > > > > minimize the number of requests to server". The id had to
>>> be
>>> > >> > changed
>>> > >> > > as
>>> > >> > > > > > there was a collision, and the KIP also had to be
>>> renamed, as
>>> > >> some
>>> > >> > of
>>> > >> > > > its
>>> > >> > > > > > motivations were outdated.
>>> > >> > > > > >
>>> > >> > > > > > Thanks,
>>> > >> > > > > > Daniel
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >
>>> >
>>>
>>


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Hi Wlaker (pun intended),

Thank you for your feedback! Please find my answers inline.

Best,
Bruno

On 26.08.20 18:46, Walker Carlson wrote:

Hello Burno,

Thanks for the KIP!

Not to pile on, but I also had a couple additional questions. I am not
super familiar with the StreamThread internals so please forgive any
misconceptions if these are not relevant questions.

1. In requestClose if a thread does not close properly and deadlocks for
some reason how will we avoid the client waiting on the thread to close?
like when you try to close the Kafka Streams client from a thread
UncaughtExcetiopnHandler now.

The kip said it would improve the handling of these conditions, However I
did not find it clear what strategy this improvement  would use. Maybe
handling broken threads is out of the scope of this KIP or am I missing
something?



The main goal of requestClose() is to avoid the deadlock when closing 
the Kafka Streams client from the uncaught exception handler of a stream 
thread. The case in which the close may encounter broken stream threads 
is orthogonal to this KIP, IMO.
But as stated in my previous e-mail, I will rethink the necessity of a 
method requestClose().



2a. Will the removal of Stream Threads in state DEAD be automatic? And will
it be for all in that state or just for those closed with
shutDownStreamThread?



Yes, it would be automatic and it would be for all. I will some words 
about that in the KIP.




2b. From the wording it seems that removing DEAD threads form the Kafka
Streams client will be a new feature of this kip. If that is the case is
the reasonable possibility that keeping the dead threads in metadata might
be useful? For example if a thread is continually erroring and restarting a
replacement



Yes, that is a good question. I do not know how reasonable it is to keep 
DEAD stream threads around. However, what we could do is introducing a 
metric in the KIP to keep track of the stream threads that have died. 
IMO, such a metric is better than keeping around DEAD stream thread. I 
will add the metric to the KIP.




3. Maybe instead of addThread() we could use startNewThread()?
I agree with John that startStreamThread could easily be misinterpreted
i.e. as startStreamThreads.



I will change the names in the KIP.


Thanks,
Walker

On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:


Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.

4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all 

[jira] [Created] (KAFKA-10442) Tooling to detect and abort hanging transactions (KIP-664)

2020-08-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10442:
---

 Summary: Tooling to detect and abort hanging transactions (KIP-664)
 Key: KAFKA-10442
 URL: https://issues.apache.org/jira/browse/KAFKA-10442
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


This JIRA tracks the implementation of KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Hi John,

Thank you for your feedback!

Please find my answers inline.

Best,
Bruno

On 26.08.20 17:49, John Roesler wrote:

Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.



This makes sense to me. I will rename the methods.


2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.



You are right, `close(Duration.ZERO)` would avoid the deadlock. However, 
`close(Duration.ZERO)` does not guarantee you that all resources get 
closed.
Furthermore, it is error-prone to rely on users to use the correct 
overload. One possibility to avoid that users use the wrong overload is 
to check in `close()` if the calling thread is a stream thread and in 
that case to call `close(0)` instead of `close(Long.MAX_VALUE)`. Maybe, 
I could also find a solution for the resource issue. Let me think about 
it. In conclusion, I agree that a new method may not be needed.



3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.



We definitely cannot reuse the name of a crashed stream thread, 
immediately. What we could do is keep a list of previously used names 
(or secondhand names to take your hand metaphor up) that are free now 
and reuse them for new stream threads. If there are no second-hand names 
a counter is increased and a new name is created. The list of secondhand 
names would be bounded by the maximum number of stream threads that were 
running contemporaneously. I guess that would not be too complex to 
implement and would avoid the pathological case. IMO, it would be less 
annoying have a new stream thread's metrics monitored in a graph of a 
stream thread that previously crashed than to run into the pathological 
case that costs money.




4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all StreamThreads die or when the Global thread dies. We
have a proposal from Navinder (KIP-406) to allow the Global
thread to automatically come back to life after some errors,
but presumably others would still be fatal.

I guess your reasoning is that if the cause of the ERROR
state happens to be just from all the StreamThreads dying,
and if there's a way to replace them, then it should be
possible to recover from this ERROR state.

Maybe that makes sense, and we should just note that if the
Global thread is dead, it won't be possible to transition
out of ERROR state.



Here, I was torn between ERROR and RUNNING.
I guess the key question here is: What is the meaning of ERROR?
Is it a terminal state that signalizes a fatal error that cannot be 
recovered without a restart of the client? If yes, there should not be 
any transition from ERROR to any state. I 

[DISCUSS] KIP-654 Aborting Transaction with non-flushed data should throw a non-fatal Exception

2020-08-27 Thread Gokul Srinivas

Hello all,

I would like to propose the following KIP to throw a new non-fatal 
exception whilst aborting transactions with non-flushed data. This will 
help users distinguish non-fatal errors and potentially retry the batch.


*Issue *- https://issues.apache.org/jira/browse/KAFKA-10186 



*KIP *- 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-654:+Aborted+transaction+with+non-flushed+data+should+throw+a+non-fatal+exception 



Please let me know how best we can proceed with this.

-Gokul



Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Thank you Matthias for the feedback.

Please find my answers inline.

Best,
Bruno

On 26.08.20 19:54, Matthias J. Sax wrote:

Thanks for the KIP Bruno.

While reading it, I had the same questions as raised by John and Walker
(so I won't repeat them).

In addition, I think that adding/removing threads should only be allowed
if the client state is RUNNING (but not in any other state, maybe except
ERROR). Furthermore, it seem that the methods must be `synchronized`
similar to `start()` and `close()`.



Good point about only adding stream threads in client state RUNNING. I 
will add that to the KIP.



IMO `synchronized` is a implementation detail that we should discuss on 
the PR.




While I understand that current `close(Duration.ZERO)` is not the same
as `requestClose()`, I am wondering if we should change the semantics of
`close()` instead of adding a new method though?



As I wrote in my previous e-mails, I will rethink the addition of method 
requestClose().




Btw: for thread naming, I personally think that just using a counter (as
we do right now) might be ok. If this becomes an issue, we could improve
it later.


-Matthias

On 8/26/20 9:46 AM, Walker Carlson wrote:

Hello Burno,

Thanks for the KIP!

Not to pile on, but I also had a couple additional questions. I am not
super familiar with the StreamThread internals so please forgive any
misconceptions if these are not relevant questions.

1. In requestClose if a thread does not close properly and deadlocks for
some reason how will we avoid the client waiting on the thread to close?
like when you try to close the Kafka Streams client from a thread
UncaughtExcetiopnHandler now.

The kip said it would improve the handling of these conditions, However I
did not find it clear what strategy this improvement  would use. Maybe
handling broken threads is out of the scope of this KIP or am I missing
something?

2a. Will the removal of Stream Threads in state DEAD be automatic? And will
it be for all in that state or just for those closed with
shutDownStreamThread?

2b. From the wording it seems that removing DEAD threads form the Kafka
Streams client will be a new feature of this kip. If that is the case is
the reasonable possibility that keeping the dead threads in metadata might
be useful? For example if a thread is continually erroring and restarting a
replacement

3. Maybe instead of addThread() we could use startNewThread()?
I agree with John that startStreamThread could easily be misinterpreted
i.e. as startStreamThreads.

Thanks,
Walker

On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:


Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.

4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance 

Re: [VOTE] KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

2020-08-27 Thread Walker Carlson
+1 (Non Binding). Good Kip Bruno

Walker

On Tue, Aug 25, 2020 at 11:17 AM Guozhang Wang  wrote:

> +1. Thanks Bruno!
>
>
> Guozhang
>
> On Tue, Aug 25, 2020 at 4:00 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > I would like to start the vote for
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> >
> > Best,
> > Bruno
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

2020-08-27 Thread Bill Bejeck
Thanks for the KIP Bruno.

+1 (binding)

-Bill

On Thu, Aug 27, 2020 at 3:15 PM Walker Carlson 
wrote:

> +1 (Non Binding). Good Kip Bruno
>
> Walker
>
> On Tue, Aug 25, 2020 at 11:17 AM Guozhang Wang  wrote:
>
> > +1. Thanks Bruno!
> >
> >
> > Guozhang
> >
> > On Tue, Aug 25, 2020 at 4:00 AM Bruno Cadonna 
> wrote:
> >
> > > Hi,
> > >
> > > I would like to start the vote for
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > >
> > > Best,
> > > Bruno
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


[jira] [Created] (KAFKA-10443) Consider providing standard set of users in system tests

2020-08-27 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-10443:
-

 Summary: Consider providing standard set of users in system tests
 Key: KAFKA-10443
 URL: https://issues.apache.org/jira/browse/KAFKA-10443
 Project: Kafka
  Issue Type: Test
  Components: system tests
Reporter: Ron Dagostino


As part of the KIP-554 implementation we decided to exercise the AdminClient 
interface for creating SCRAM credentials within the system tests.  So instead 
of bootstrapping both the broker and the user credentials via ZooKeeper 
(`kafka-configs.sh --alter --zookeeper`) before the broker starts, we 
bootstrapped just the broker credential via ZooKeeper and then we started the 
brokers and created the user credential afterwards via the AdminClient 
(`kafka-configs.sh --alter --bootstrap-server`).  We did this by configuring 
the admin client to log in as the broker.  This works fine, but it feels like 
we should have a separate "admin" user available to do this rather than having 
to authenticate the admin client as the broker.  Furthermore, this feels like 
it might be a good pattern to consider everywhere -- whenever we create a 
broker user we should also create an admin user for tests that want/need to 
leverage it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Lucas Bradstreet
Hi Jason,

This looks like a very useful tool, thanks for writing it up.

Given that it's possible for replica producer states to diverge from each
other, it would be very useful if DescribeProducers(Request,Response) and
tooling is able to query all partition replicas for their producers. One
way I can see this being used immediately is in kafka's system tests,
especially the ones that inject failures. At the end of the test we can
query all replicas and make sure that their states have not diverged. I can
also see it being useful when debugging production clusters too.

Would it be worth returning transactional.id.expiration.ms in the
DescribeProducersResponse?

Cheers,

Lucas



On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino  wrote:

> Yes, that definitely sounds reasonable.  Thanks, Jason!
>
> Ron
>
> On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> wrote:
>
> > Hey Ron,
> >
> > We do not typically backport new APIs to older versions. I think we can
> > however make the --abort command compatible with older versions. It would
> > require a user to do some analysis on their own to identify a hanging
> > transaction, but then they can use the tool from a new release to
> recover.
> > For example, users could detect a hanging transaction through the
> existing
> > "LastStableOffsetLag" metric and then collect the needed information
> from a
> > dump of the log (or producer snapshot). It's more work, but at least it's
> > possible. Does that sound fair?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> wrote:
> >
> > > Hi Jason.  Thanks for the excellently-written KIP.
> > >
> > > Will the implementation be backported to prior Kafka versions?  The
> > reason
> > > I ask is because if it is not backported and similar functionality is
> not
> > > otherwise made available for older versions, then the only recourse
> > (aside
> > > from deleting and recreating the topic as you pointed out) may be to
> > > upgrade to 2.7 (or whatever version ends up getting this
> functionality).
> > > Such an upgrade may not be desirable, especially if the number of
> > > intermediate versions is considerable. I understand the mantra of
> "never
> > > fall too many versions behind" but the reality of it is that it isn't
> > > always the case.  Even if the version is relatively recent, an upgrade
> > may
> > > still not be possible for some time, and a quicker resolution may be
> > > necessary.
> > >
> > > Ron
> > >
> > > On Wed, Aug 26, 2020 at 2:33 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I've added a proposal to handle the problem of hanging transactions:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > > .
> > > > In theory, this should never happen. In practice, we have hit one bug
> > > where
> > > > it was possible and there are few good options today to recover.
> Take a
> > > > look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > >
> >
>


Re: Access to submit KIP

2020-08-27 Thread Jun Rao
Hi, Mandeep,

Thanks for your interest. Just gave you the wiki permissions.

Jun

On Wed, Aug 26, 2020 at 9:08 PM mandeep gandhi 
wrote:

> Hi all,
>
> Please grant me access to create a KIP.
>
> Email - welcomemand...@gmail.com
> Username - ifconfig
>
> Thanks,
> Mandeep Gandhi
>


Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Jason Gustafson
Hey Lucas,

Thanks for the comments. Responses below:

> Given that it's possible for replica producer states to diverge from each
other, it would be very useful if DescribeProducers(Request,Response) and
tooling is able to query all partition replicas for their producers

Yes, it makes sense to me to let DescribeProducers work on both followers
and leaders. In fact, I'm encouraged that there are use cases for this work
other than detecting hanging transactions. That was indeed the hope, but I
didn't have anything specific in mind. I will update the proposal.

> Would it be worth returning transactional.id.expiration.ms in the
DescribeProducersResponse?

That's an interesting thought as well. Are you trying to avoid the need to
specify it through the command line? The tool could also query the value
with DescribeConfigs I suppose.

Thanks,
Jason

On Thu, Aug 27, 2020 at 10:48 AM Lucas Bradstreet 
wrote:

> Hi Jason,
>
> This looks like a very useful tool, thanks for writing it up.
>
> Given that it's possible for replica producer states to diverge from each
> other, it would be very useful if DescribeProducers(Request,Response) and
> tooling is able to query all partition replicas for their producers. One
> way I can see this being used immediately is in kafka's system tests,
> especially the ones that inject failures. At the end of the test we can
> query all replicas and make sure that their states have not diverged. I can
> also see it being useful when debugging production clusters too.
>
> Would it be worth returning transactional.id.expiration.ms in the
> DescribeProducersResponse?
>
> Cheers,
>
> Lucas
>
>
>
> On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino  wrote:
>
> > Yes, that definitely sounds reasonable.  Thanks, Jason!
> >
> > Ron
> >
> > On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> > wrote:
> >
> > > Hey Ron,
> > >
> > > We do not typically backport new APIs to older versions. I think we can
> > > however make the --abort command compatible with older versions. It
> would
> > > require a user to do some analysis on their own to identify a hanging
> > > transaction, but then they can use the tool from a new release to
> > recover.
> > > For example, users could detect a hanging transaction through the
> > existing
> > > "LastStableOffsetLag" metric and then collect the needed information
> > from a
> > > dump of the log (or producer snapshot). It's more work, but at least
> it's
> > > possible. Does that sound fair?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> > wrote:
> > >
> > > > Hi Jason.  Thanks for the excellently-written KIP.
> > > >
> > > > Will the implementation be backported to prior Kafka versions?  The
> > > reason
> > > > I ask is because if it is not backported and similar functionality is
> > not
> > > > otherwise made available for older versions, then the only recourse
> > > (aside
> > > > from deleting and recreating the topic as you pointed out) may be to
> > > > upgrade to 2.7 (or whatever version ends up getting this
> > functionality).
> > > > Such an upgrade may not be desirable, especially if the number of
> > > > intermediate versions is considerable. I understand the mantra of
> > "never
> > > > fall too many versions behind" but the reality of it is that it isn't
> > > > always the case.  Even if the version is relatively recent, an
> upgrade
> > > may
> > > > still not be possible for some time, and a quicker resolution may be
> > > > necessary.
> > > >
> > > > Ron
> > > >
> > > > On Wed, Aug 26, 2020 at 2:33 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I've added a proposal to handle the problem of hanging
> transactions:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > > > .
> > > > > In theory, this should never happen. In practice, we have hit one
> bug
> > > > where
> > > > > it was possible and there are few good options today to recover.
> > Take a
> > > > > look and let me know what you think.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Lucas Bradstreet
>> Would it be worth returning transactional.id.expiration.ms in the
DescribeProducersResponse?

> That's an interesting thought as well. Are you trying to avoid the need to
specify it through the command line? The tool could also query the value
with DescribeConfigs I suppose.

Basically. I'm not sure how useful this will be in practice, though it
might help when debugging.

Lucas

On Thu, Aug 27, 2020 at 11:00 AM Jason Gustafson  wrote:

> Hey Lucas,
>
> Thanks for the comments. Responses below:
>
> > Given that it's possible for replica producer states to diverge from each
> other, it would be very useful if DescribeProducers(Request,Response) and
> tooling is able to query all partition replicas for their producers
>
> Yes, it makes sense to me to let DescribeProducers work on both followers
> and leaders. In fact, I'm encouraged that there are use cases for this work
> other than detecting hanging transactions. That was indeed the hope, but I
> didn't have anything specific in mind. I will update the proposal.
>
> > Would it be worth returning transactional.id.expiration.ms in the
> DescribeProducersResponse?
>
> That's an interesting thought as well. Are you trying to avoid the need to
> specify it through the command line? The tool could also query the value
> with DescribeConfigs I suppose.
>
> Thanks,
> Jason
>
> On Thu, Aug 27, 2020 at 10:48 AM Lucas Bradstreet 
> wrote:
>
> > Hi Jason,
> >
> > This looks like a very useful tool, thanks for writing it up.
> >
> > Given that it's possible for replica producer states to diverge from each
> > other, it would be very useful if DescribeProducers(Request,Response) and
> > tooling is able to query all partition replicas for their producers. One
> > way I can see this being used immediately is in kafka's system tests,
> > especially the ones that inject failures. At the end of the test we can
> > query all replicas and make sure that their states have not diverged. I
> can
> > also see it being useful when debugging production clusters too.
> >
> > Would it be worth returning transactional.id.expiration.ms in the
> > DescribeProducersResponse?
> >
> > Cheers,
> >
> > Lucas
> >
> >
> >
> > On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino 
> wrote:
> >
> > > Yes, that definitely sounds reasonable.  Thanks, Jason!
> > >
> > > Ron
> > >
> > > On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Ron,
> > > >
> > > > We do not typically backport new APIs to older versions. I think we
> can
> > > > however make the --abort command compatible with older versions. It
> > would
> > > > require a user to do some analysis on their own to identify a hanging
> > > > transaction, but then they can use the tool from a new release to
> > > recover.
> > > > For example, users could detect a hanging transaction through the
> > > existing
> > > > "LastStableOffsetLag" metric and then collect the needed information
> > > from a
> > > > dump of the log (or producer snapshot). It's more work, but at least
> > it's
> > > > possible. Does that sound fair?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> > > wrote:
> > > >
> > > > > Hi Jason.  Thanks for the excellently-written KIP.
> > > > >
> > > > > Will the implementation be backported to prior Kafka versions?  The
> > > > reason
> > > > > I ask is because if it is not backported and similar functionality
> is
> > > not
> > > > > otherwise made available for older versions, then the only recourse
> > > > (aside
> > > > > from deleting and recreating the topic as you pointed out) may be
> to
> > > > > upgrade to 2.7 (or whatever version ends up getting this
> > > functionality).
> > > > > Such an upgrade may not be desirable, especially if the number of
> > > > > intermediate versions is considerable. I understand the mantra of
> > > "never
> > > > > fall too many versions behind" but the reality of it is that it
> isn't
> > > > > always the case.  Even if the version is relatively recent, an
> > upgrade
> > > > may
> > > > > still not be possible for some time, and a quicker resolution may
> be
> > > > > necessary.
> > > > >
> > > > > Ron
> > > > >
> > > > > On Wed, Aug 26, 2020 at 2:33 PM Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I've added a proposal to handle the problem of hanging
> > transactions:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > > > > .
> > > > > > In theory, this should never happen. In practice, we have hit one
> > bug
> > > > > where
> > > > > > it was possible and there are few good options today to recover.
> > > Take a
> > > > > > look and let me know what you think.
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

2020-08-27 Thread Matthias J. Sax
+1 (binding)

On 8/27/20 1:10 PM, John Roesler wrote:
> Thanks, Bruno!
> 
> I'm +1 (binding)
> 
> -John
> 
> On Thu, 2020-08-27 at 15:35 -0400, Bill Bejeck wrote:
>> Thanks for the KIP Bruno.
>>
>> +1 (binding)
>>
>> -Bill
>>
>> On Thu, Aug 27, 2020 at 3:15 PM Walker Carlson 
>> wrote:
>>
>>> +1 (Non Binding). Good Kip Bruno
>>>
>>> Walker
>>>
>>> On Tue, Aug 25, 2020 at 11:17 AM Guozhang Wang  wrote:
>>>
 +1. Thanks Bruno!


 Guozhang

 On Tue, Aug 25, 2020 at 4:00 AM Bruno Cadonna 
>>> wrote:
> Hi,
>
> I would like to start the vote for
>
>
>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> Best,
> Bruno
>

 --
 -- Guozhang

> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-654 Aborting Transaction with non-flushed data should throw a non-fatal Exception

2020-08-27 Thread Matthias J. Sax
Thanks for the KIP. Looks good overall.

However, I am wondering if the new exception should extend
`KafkaException`? It seems, extending `ApiException` or maybe even
`RetriableException` might be better?

About the name itself. I would prefer something simpler like
`AbortedTransactionException`.

Thoughts?


-Matthias


On 8/27/20 10:24 AM, Gokul Srinivas wrote:
> Hello all,
> 
> I would like to propose the following KIP to throw a new non-fatal
> exception whilst aborting transactions with non-flushed data. This will
> help users distinguish non-fatal errors and potentially retry the batch.
> 
> *Issue *- https://issues.apache.org/jira/browse/KAFKA-10186
> 
> 
> *KIP *-
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-654:+Aborted+transaction+with+non-flushed+data+should+throw+a+non-fatal+exception
> 
> 
> 
> Please let me know how best we can proceed with this.
> 
> -Gokul
> 
> 



signature.asc
Description: OpenPGP digital signature


Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #35

2020-08-27 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9924: Add RocksDB metric num-entries-active-mem-table (#9177)


--
[...truncated 3.22 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED


Re: [DISCUSS] KIP-654 Aborting Transaction with non-flushed data should throw a non-fatal Exception

2020-08-27 Thread Sophie Blee-Goldman
Hey Gokul, thanks for taking up this KIP!

I agree with Matthias that directly extending KafkaException may not be
ideal,
and we should instead extend APIException or RetriableException. Of the two,
I think APIException would be more appropriate. My understanding is that
RetriableException is generally reserved for internally retriable exceptions
whereas APIException is used for pseudo-fatal exceptions that require some
user input as to how to proceed (eg ProducerFencedException)

I also agree that the name could be a bit more concise. My personal vote
would be for "TransactionAbortedException" which seems a bit more
grammatically aligned with the other exceptions in Kafka.

Cheers,
Sophie

On Thu, Aug 27, 2020 at 6:01 PM Matthias J. Sax  wrote:

> Thanks for the KIP. Looks good overall.
>
> However, I am wondering if the new exception should extend
> `KafkaException`? It seems, extending `ApiException` or maybe even
> `RetriableException` might be better?
>
> About the name itself. I would prefer something simpler like
> `AbortedTransactionException`.
>
> Thoughts?
>
>
> -Matthias
>
>
> On 8/27/20 10:24 AM, Gokul Srinivas wrote:
> > Hello all,
> >
> > I would like to propose the following KIP to throw a new non-fatal
> > exception whilst aborting transactions with non-flushed data. This will
> > help users distinguish non-fatal errors and potentially retry the batch.
> >
> > *Issue *- https://issues.apache.org/jira/browse/KAFKA-10186
> > 
> >
> > *KIP *-
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-654:+Aborted+transaction+with+non-flushed+data+should+throw+a+non-fatal+exception
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-654:+Aborted+transaction+with+non-flushed+data+should+throw+a+non-fatal+exception
> >
> >
> >
> > Please let me know how best we can proceed with this.
> >
> > -Gokul
> >
> >
>
>


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-27 Thread Sophie Blee-Goldman
Ok I'm definitely feeling pretty dumb now, but I was just thinking how
ridiculous
it is that the Consumer forces you to configure your Deserializer through
actual
config maps instead of just taking the ones you pass in directly. So I
thought
"why not just fix the Consumer to allow passing in an actual Deserializer
object"
and went to go through the code in case there's some legitimate reason why
not,
and what do you know. You actually can pass in an actual Deserializer
object!
There is a KafkaConsumer constructor that accepts a key and value
Deserializer,
and doesn't instantiate or configure a new one if provided in this way. Duh.

Sorry for misleading everyone on that front. I'm just happy to find out
that a
reasonable way of configuring deserializer actually *is *possible after
all. In that
case, maybe we can remove the extra configs from this KIP and just proceed
with the deprecation?

Obviously that doesn't help anything with regards to the remaining question
that
John/Leah have posed. Now I probably don't have anything valuable to offer
there
since I know next to nothing about Scala, but I do want to
better understand: why
would we add an "implicit" (what exactly does this mean?) that relies on
allowing
users to not set the windowSize, if we are explicitly taking away that
option from
the Java users? Or if we have already added something, can't we just
deprecate
it like we are deprecating the Java constructor? I may need some remedial
lessons
in Scala just to understand the problem that we apparently have, because I
don't
get it.

By the way, I'm a little tempted to say that we should go one step further
and
deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
a bit too radical for the moment. It just seems like default serde configs
have been
a lot more trouble than they're worth overall. That said, these particular
configs
don't appear to have hurt anyone thus far, at least not that we know of
(possibly
because no one is using it anyway) so there's no strong motivation to do so

On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas  wrote:

> Hey John,
>
> Thanks for pointing this out, I wasn't sure how to handle the Scala
> changes.
>
> I'm not fully versed in the Scala version of Streams, so feel free to
> correct me if any of my assumptions are wrong. I think logging an error
> message and then calling the constructor that requires a windowSize seems
> like the simplest fix from my point of view. So instead of
> calling`TimeWindowedSerde(final Serde inner)`, we could
> call `TimeWindowedSerde(final Serde inner, final long windowSize)` with
> Long.MAX_VALUE as the window size.
>
> I do feel like we would want to add an implicit to `Serdes.scala` that
> takes a serde and a window size so that users can access the constructor
> that initializes with the correct window size. I agree with your comment on
> the KIP-616 PR that the serde needs to be pre-configured when it's passed,
> but I'm not sure we would need a windowSize config. I think if the
> constructor is passed the serde and the window size, then window size
> should be set within the deserializer. The only catch is if the Scala
> version of the consumer creates a new deserializer, and at that point we'd
> need a window size config, but I'm not sure if that's the case.
>
> WDYT - is it possible to alter the existing implicit and add a new one?
>
> On Wed, Aug 26, 2020 at 10:00 AM John Roesler  wrote:
>
> > Hi Leah,
> >
> > I was just reviewing the PR for KIP-616 and realized that we
> > forgot to mention the Scala API in your KIP. We should
> > consider it because `scala.Serdes.timeWindowedSerde` is
> > implicitly using the exact constructor you're deprecating.
> >
> > I had some ideas in the code review:
> > https://github.com/apache/kafka/pull/8955#discussion_r477358755
> >
> > What do you think is the best approach?
> >
> > Concretely, I think Yuriy can make the call for KIP-616 (for
> > the new implicit that he's adding). But I think your KIP-659
> > should mention how we modify the existing implicit.
> >
> > Typically, we'd try to avoid throwing new exceptions or
> > causing compile errors, so
> > * dropping the implicit is probably off the table (compile
> > error).
> > * throwing an exception in the deserializer may not be ok,
> > althought it might still actually be ok since it's adding a
> > corruption check.
> > * logging an ERROR message and then passing through to the
> > underlying deserializer would be more conservative.
> >
> > What do you think we should do?
> >
> > Thanks,
> > -John
> >
> > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > > Thanks for the typo catch, John.
> > >
> > > Let me know if anyone else has thoughts or ideas.
> > >
> > > Cheers,
> > > Leah
> > >
> > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler 
> > wrote:
> > >
> > > > Thanks, all,
> > > >
> > > > Based on my reading of the conversation, it sounds like I
> > > > have some legwork to do in KIP-645, but our collective
> > > > instinct 

[GitHub] [kafka-site] scott-confluent opened a new pull request #300: [MINOR] adding Itau Unibanco and OTICS to the powered-by page

2020-08-27 Thread GitBox


scott-confluent opened a new pull request #300:
URL: https://github.com/apache/kafka-site/pull/300


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

2020-08-27 Thread Ning Zhang
Hello Mickael,

> 1. How does offset translation work with this new sink connector?
> Should we also include a CheckpointSinkConnector?

CheckpointSourceConnector will be re-used as the same as current. When EOS is 
enabled, we will run 3 connectors:

MirrorSinkConnector (based on SinkConnector)
MirrorCheckpointConnector (based on SourceConnector)
MirrorHeartbeatConnector (based on SourceConnector)

For the last two connectors (checkpoint, heartbeat), if we do not strictly 
require EOS, it is probably OK to use current implementation on SourceConnector.

I will update the KIP to clarify this, if it sounds acceptable.

> 2. Migrating to this new connector could be tricky as effectively the
> Connect runtime needs to point to the other cluster, so its state
> (stored in the __connect topics) is lost. Unfortunately there is no
> easy way today to prime Connect with offsets. Not necessarily a
> blocking issue but this should be described as I think the current
> Migration section looks really optimistic at the moment

totally agree. I will update the migration part with notes about potential 
service interruption, without careful planning.

> 3. We can probably find a better name than "transaction.producer".
> Maybe we can follow a similar pattern than Streams (which uses
> "processing.guarantee")?

"processing.guarantee" sounds better

> 4. Transactional Ids used by the producer are generated based on the
> task assignments. If there's a single task, if it crashes and restarts
> it would still get the same id. Can this be an issue?

>From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, the 
>author suggests to postfix transaction.id with :

"To avoid handling an external store we will use a static encoding similarly as 
in spring-kafka:
The transactional.id is now the transactionIdPrefix appended with 
..."

I think as long as there is no more than one producer use same "transaction.id" 
at the same time, it is OK. 

Also from my tests, this "transaction.id" assignment works fine with failures. 
To tighten it up, I also tested to use  "connector task id" in 
"transaction.id". The "connector task id" is typically composed of 
connector_name and task_id, which is also unique across all connectors in a KC 
cluster.

 > 5. The logic in the KIP creates a new transaction every time put() is
> called. Is there a performance impact?

It could be a performance hit if the transaction batch is too small under high 
ingestion rate. The batch size depends on how many messages that consumer poll 
each time. Maybe we could increase "max.poll.records" to have larger batch size.

Overall, thanks so much for the valuable feedback. If the responses sounds 
good, I will do a cleanup of KIP.

On 2020/08/27 09:59:57, Mickael Maison  wrote: 
> Thanks Ning for the KIP. Having stronger guarantees when mirroring
> data would be a nice improvement!
> 
> A few comments:
> 1. How does offset translation work with this new sink connector?
> Should we also include a CheckpointSinkConnector?
> 
> 2. Migrating to this new connector could be tricky as effectively the
> Connect runtime needs to point to the other cluster, so its state
> (stored in the __connect topics) is lost. Unfortunately there is no
> easy way today to prime Connect with offsets. Not necessarily a
> blocking issue but this should be described as I think the current
> Migration section looks really optimistic at the moment
> 
> 3. We can probably find a better name than "transaction.producer".
> Maybe we can follow a similar pattern than Streams (which uses
> "processing.guarantee")?
> 
> 4. Transactional Ids used by the producer are generated based on the
> task assignments. If there's a single task, if it crashes and restarts
> it would still get the same id. Can this be an issue?
> 
> 5. The logic in the KIP creates a new transaction every time put() is
> called. Is there a performance impact?
> 
> On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan  wrote:
> >
> > Awesome, this will be a huge advancement. I also want to point out that
> > this KIP implements MirrorSinkConnector as well, finally, which is a very
> > often requested missing feature in my experience.
> >
> > Ryanne
> >
> > On Fri, Aug 21, 2020, 9:45 AM Ning Zhang  wrote:
> >
> > > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > > At the high-level, it resembles the idea of how HDFS Sink Connector
> > > achieves EOS across clusters by managing and storing the consumer offsets
> > > in an external persistent storage, but also leverages the current Kafka 
> > > EOS
> > > guarantee within a single cluster. I have done some experiments especially
> > > for the failure cases and I am very appreciated for comments and feedback
> > > on this KIP from bigger audience.
> > >
> 


Requesting permission

2020-08-27 Thread Brandon Brown
I’d like to request permission to create a KIP. My username is brbrown35. 

Thanks,
Brandon 

Brandon Brown
(240) 506-8335 (m)

Re: Requesting permission

2020-08-27 Thread Jun Rao
Hi, Brandon,

Thanks for your interest. Just gave you the wiki permission.

Jun

On Thu, Aug 27, 2020 at 1:17 PM Brandon Brown 
wrote:

> I’d like to request permission to create a KIP. My username is brbrown35.
>
> Thanks,
> Brandon
>
> Brandon Brown
> (240) 506-8335 (m)


Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Jason Gustafson
Hi Boyang,

Thanks for the comments. Responses below:

> 1. For the analysis section, is there any consistency guarantee for
`ListTransactions` and `DescribeTransactions`? Let's say the coordinator
receives a DescribeTransactions while the transaction is almost complete at
the same time, should we have isolation to avoid returning stale
information? If an admin client sends excessive describe requests, would it
affect the normal processing on transactions?

It's similar to `ListGroups` and `DescribeGroup`. There is no guarantee on
the consistency between separate calls. The API is just reflecting the
current state. In general, tools need to be careful when trying to take
some action based on that state. This is one of the reasons I changed the
`WriteTxnMarkers` API to take the start offset of the transaction the user
wants to abort. In case we get into some race with the coordinator, at
least we won't end up aborting the wrong transaction.

> 2. I'm not sure whether `PartitionsWithLateTransactionsCount` is providing
much value here. Users have no responsibilities to tight their transaction
session size below the max transaction timeout. As long as there is
prolonged progress being made from the client side, the root cause would be
on some ill-performing producers, which means we should monitor the
producer client instead.

Not sure I follow. The max transaction timeout is enforced in the
`AddPartitionsToTxn` API. The client is not allowed a larger timeout, so it
seems fair for the broker to treat transactions which have been open longer
than this as "late." The benefit of this metric is that it provides a
simple alert criteria (i.e. alert for any positive value).

> 3. It would be good to highlight new fields in the
`WriteTxnMarkersRequest`
schema

Ack. Will do.

> 4. The response schema in `ListTransactions` section was wrong, which
should be `ListTransactionsResponse`

Thanks. I'll fix the name, but the schema looks like what I wanted.

>5. From the original context of ticket 9144, the reasoning for the hanging
transaction is due to an uncensored open transaction on the partition
leader. Could we just add the direct admin request support like
`findAllSuspiciousTransaction` to detect that, by scanning all partition
leaders and transaction coordinators within the cluster, and figure out any
open transaction on the partition leader side not known to any coordinator?

This is basically what the --find-hanging command is doing. I debated
moving this logic either into the admin client or the broker, but in the
end, I decided it was better to have good general purpose APIs and let the
complexity be in the tooling.

> 6. typo: "In this case, we the command..." => "In this case, the
command..."

Ack. Will fix.

Thanks,
Jason



On Thu, Aug 27, 2020 at 2:46 PM Boyang Chen 
wrote:

> Thanks Jason for the tooling proposal. A couple of comments:
>
> 1. For the analysis section, is there any consistency guarantee for
> `ListTransactions` and `DescribeTransactions`? Let's say the coordinator
> receives a DescribeTransactions while the transaction is almost complete at
> the same time, should we have isolation to avoid returning stale
> information? If an admin client sends excessive describe requests, would it
> affect the normal processing on transactions?
>
> 2. I'm not sure whether `PartitionsWithLateTransactionsCount` is providing
> much value here. Users have no responsibilities to tight their transaction
> session size below the max transaction timeout. As long as there is
> prolonged progress being made from the client side, the root cause would be
> on some ill-performing producers, which means we should monitor the
> producer client instead.
>
> 3. It would be good to highlight new fields in the `WriteTxnMarkersRequest`
> schema
>
> 4. The response schema in `ListTransactions` section was wrong, which
> should be `ListTransactionsResponse`
>
> 5. From the original context of ticket 9144, the reasoning for the hanging
> transaction is due to an uncensored open transaction on the partition
> leader. Could we just add the direct admin request support like
> `findAllSuspiciousTransaction` to detect that, by scanning all partition
> leaders and transaction coordinators within the cluster, and figure out any
> open transaction on the partition leader side not known to any coordinator?
>
> 6. typo: "In this case, we the command..." => "In this case, the
> command..."
>
> Boyang
>
> On Thu, Aug 27, 2020 at 11:44 AM Lucas Bradstreet 
> wrote:
>
> > >> Would it be worth returning transactional.id.expiration.ms in the
> > DescribeProducersResponse?
> >
> > > That's an interesting thought as well. Are you trying to avoid the need
> > to
> > specify it through the command line? The tool could also query the value
> > with DescribeConfigs I suppose.
> >
> > Basically. I'm not sure how useful this will be in practice, though it
> > might help when debugging.
> >
> > Lucas
> >
> > On Thu, Aug 27, 2020 at 11:00 AM Jason 

Re: [VOTE] KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

2020-08-27 Thread John Roesler
Thanks, Bruno!

I'm +1 (binding)

-John

On Thu, 2020-08-27 at 15:35 -0400, Bill Bejeck wrote:
> Thanks for the KIP Bruno.
> 
> +1 (binding)
> 
> -Bill
> 
> On Thu, Aug 27, 2020 at 3:15 PM Walker Carlson 
> wrote:
> 
> > +1 (Non Binding). Good Kip Bruno
> > 
> > Walker
> > 
> > On Tue, Aug 25, 2020 at 11:17 AM Guozhang Wang  wrote:
> > 
> > > +1. Thanks Bruno!
> > > 
> > > 
> > > Guozhang
> > > 
> > > On Tue, Aug 25, 2020 at 4:00 AM Bruno Cadonna 
> > wrote:
> > > > Hi,
> > > > 
> > > > I would like to start the vote for
> > > > 
> > > > 
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > > > Best,
> > > > Bruno
> > > > 
> > > 
> > > --
> > > -- Guozhang
> > > 



Re: Requesting permission

2020-08-27 Thread Brandon Brown
Thanks! I was able to make my first KIP. 

-Brandon 

Brandon Brown
(240) 506-8335 (m)

> On Aug 27, 2020, at 4:19 PM, Jun Rao  wrote:
> 
> Hi, Brandon,
> 
> Thanks for your interest. Just gave you the wiki permission.
> 
> Jun
> 
>> On Thu, Aug 27, 2020 at 1:17 PM Brandon Brown 
>> wrote:
>> 
>> I’d like to request permission to create a KIP. My username is brbrown35.
>> 
>> Thanks,
>> Brandon
>> 
>> Brandon Brown
>> (240) 506-8335 (m)


[DISCUSS] KIP-665 Kafka Connect Hash SMT

2020-08-27 Thread brandon



https://cwiki.apache.org/confluence/display/KAFKA/KIP-665%3A+Kafka+Connect+Hash+SMT

The current pr with the proposed changes  
https://github.com/apache/kafka/pull/9057 and the original 3rd party  
contribution which initiated this change  
https://github.com/aiven/aiven-kafka-connect-transforms/issues/9#issuecomment-662378057.


I'm interested in any suggestions for ways to improve this as I think  
it would make a nice addition to the existing SMTs provided by Kafka  
Connect out of the box.


Thanks,
Brandon





Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Boyang Chen
Thanks Jason for the tooling proposal. A couple of comments:

1. For the analysis section, is there any consistency guarantee for
`ListTransactions` and `DescribeTransactions`? Let's say the coordinator
receives a DescribeTransactions while the transaction is almost complete at
the same time, should we have isolation to avoid returning stale
information? If an admin client sends excessive describe requests, would it
affect the normal processing on transactions?

2. I'm not sure whether `PartitionsWithLateTransactionsCount` is providing
much value here. Users have no responsibilities to tight their transaction
session size below the max transaction timeout. As long as there is
prolonged progress being made from the client side, the root cause would be
on some ill-performing producers, which means we should monitor the
producer client instead.

3. It would be good to highlight new fields in the `WriteTxnMarkersRequest`
schema

4. The response schema in `ListTransactions` section was wrong, which
should be `ListTransactionsResponse`

5. From the original context of ticket 9144, the reasoning for the hanging
transaction is due to an uncensored open transaction on the partition
leader. Could we just add the direct admin request support like
`findAllSuspiciousTransaction` to detect that, by scanning all partition
leaders and transaction coordinators within the cluster, and figure out any
open transaction on the partition leader side not known to any coordinator?

6. typo: "In this case, we the command..." => "In this case, the
command..."

Boyang

On Thu, Aug 27, 2020 at 11:44 AM Lucas Bradstreet 
wrote:

> >> Would it be worth returning transactional.id.expiration.ms in the
> DescribeProducersResponse?
>
> > That's an interesting thought as well. Are you trying to avoid the need
> to
> specify it through the command line? The tool could also query the value
> with DescribeConfigs I suppose.
>
> Basically. I'm not sure how useful this will be in practice, though it
> might help when debugging.
>
> Lucas
>
> On Thu, Aug 27, 2020 at 11:00 AM Jason Gustafson 
> wrote:
>
> > Hey Lucas,
> >
> > Thanks for the comments. Responses below:
> >
> > > Given that it's possible for replica producer states to diverge from
> each
> > other, it would be very useful if DescribeProducers(Request,Response) and
> > tooling is able to query all partition replicas for their producers
> >
> > Yes, it makes sense to me to let DescribeProducers work on both followers
> > and leaders. In fact, I'm encouraged that there are use cases for this
> work
> > other than detecting hanging transactions. That was indeed the hope, but
> I
> > didn't have anything specific in mind. I will update the proposal.
> >
> > > Would it be worth returning transactional.id.expiration.ms in the
> > DescribeProducersResponse?
> >
> > That's an interesting thought as well. Are you trying to avoid the need
> to
> > specify it through the command line? The tool could also query the value
> > with DescribeConfigs I suppose.
> >
> > Thanks,
> > Jason
> >
> > On Thu, Aug 27, 2020 at 10:48 AM Lucas Bradstreet 
> > wrote:
> >
> > > Hi Jason,
> > >
> > > This looks like a very useful tool, thanks for writing it up.
> > >
> > > Given that it's possible for replica producer states to diverge from
> each
> > > other, it would be very useful if DescribeProducers(Request,Response)
> and
> > > tooling is able to query all partition replicas for their producers.
> One
> > > way I can see this being used immediately is in kafka's system tests,
> > > especially the ones that inject failures. At the end of the test we can
> > > query all replicas and make sure that their states have not diverged. I
> > can
> > > also see it being useful when debugging production clusters too.
> > >
> > > Would it be worth returning transactional.id.expiration.ms in the
> > > DescribeProducersResponse?
> > >
> > > Cheers,
> > >
> > > Lucas
> > >
> > >
> > >
> > > On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino 
> > wrote:
> > >
> > > > Yes, that definitely sounds reasonable.  Thanks, Jason!
> > > >
> > > > Ron
> > > >
> > > > On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hey Ron,
> > > > >
> > > > > We do not typically backport new APIs to older versions. I think we
> > can
> > > > > however make the --abort command compatible with older versions. It
> > > would
> > > > > require a user to do some analysis on their own to identify a
> hanging
> > > > > transaction, but then they can use the tool from a new release to
> > > > recover.
> > > > > For example, users could detect a hanging transaction through the
> > > > existing
> > > > > "LastStableOffsetLag" metric and then collect the needed
> information
> > > > from a
> > > > > dump of the log (or producer snapshot). It's more work, but at
> least
> > > it's
> > > > > possible. Does that sound fair?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> 

Jenkins build is back to normal : Kafka » kafka-trunk-jdk11 #37

2020-08-27 Thread Apache Jenkins Server
See 




Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #36

2020-08-27 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9924: Add RocksDB metric num-entries-active-mem-table (#9177)


--
[...truncated 6.48 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest >