[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018781#comment-17018781
 ] 

Guozhang Wang commented on KAFKA-9450:
--

I think John's idea is to e.g. use a preserved key to store the current offset 
of the changelog as a preserved key, so that each time we update a value, we 
will also update this key for offset update. By doing this we would not need a 
rocksDB#flush when checkpointing, and neither do we need a checkpoint file. One 
tricky thing however, is that the offset of the written changelog is only 
updated and known after the batching produce response returns, which may not be 
immediate after the RocksDB value update.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018750#comment-17018750
 ] 

Matthias J. Sax commented on KAFKA-9450:


[~vvcephei] Your comment seem to be an orthogonal concern (that I actually 
don't share – when we call RocksDB#flush(), it seems to be safe to assume that 
RocksDB persisted the data – why do you doubt that RocksDB does not guarantee 
this; and if is did not persist it, it would be a RocksDB bug IHMO that should 
just get fixed.).

Nevertheless, this ticket is about decoupling of changelog flushing and local 
disk flushing – in contrast your comment is about two aspect of local disk 
flushing, ie, the data itself and the metadata (ie, checkpoint).

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9453) Make transaction.id optional in group mode EOS

2020-01-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9453:
---
Description: 
After 447, one of the big improvement is that we are no longer requiring single 
writer scope guarantee, so that user doesn't have to configure a unique 
transactional.id for transaction safety.

In fact, without security concern, we could even avoid using initTransaction 
API as well.

  was:After 447, one of the big improvement is that we are no longer requiring 
single writer scope guarantee, so that user doesn't have to configure a unique 
transactional.id for transaction safety.


> Make transaction.id optional in group mode EOS
> --
>
> Key: KAFKA-9453
> URL: https://issues.apache.org/jira/browse/KAFKA-9453
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> After 447, one of the big improvement is that we are no longer requiring 
> single writer scope guarantee, so that user doesn't have to configure a 
> unique transactional.id for transaction safety.
> In fact, without security concern, we could even avoid using initTransaction 
> API as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9454) Relax transaction.id security requirement

2020-01-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9454:
--

 Summary: Relax transaction.id security requirement
 Key: KAFKA-9454
 URL: https://issues.apache.org/jira/browse/KAFKA-9454
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


As we are no longer required to configure transactional.id on client, we could 
piggy-back the security check with consumer group id instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9453) Make transaction.id optional in group mode EOS

2020-01-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9453:
--

 Summary: Make transaction.id optional in group mode EOS
 Key: KAFKA-9453
 URL: https://issues.apache.org/jira/browse/KAFKA-9453
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


After 447, one of the big improvement is that we are no longer requiring single 
writer scope guarantee, so that user doesn't have to configure a unique 
transactional.id for transaction safety.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9346) Consumer fetch offset back-off with pending transactions

2020-01-18 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018730#comment-17018730
 ] 

Travis Bischel commented on KAFKA-9346:
---

Part of KIP-447, in 2.5.0.

> Consumer fetch offset back-off with pending transactions
> 
>
> Key: KAFKA-9346
> URL: https://issues.apache.org/jira/browse/KAFKA-9346
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9365) Add consumer group information to txn commit

2020-01-18 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018729#comment-17018729
 ] 

Travis Bischel commented on KAFKA-9365:
---

Part of KIP-447, in 2.5.0.

> Add consumer group information to txn commit 
> -
>
> Key: KAFKA-9365
> URL: https://issues.apache.org/jira/browse/KAFKA-9365
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> This effort adds consumer group information to the txn commit protocol on the 
> broker side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2020-01-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018670#comment-17018670
 ] 

ASF GitHub Bot commented on KAFKA-6144:
---

vinothchandar commented on pull request #7868: KAFKA-6144: Allow state stores 
to serve stale reads during rebalance
URL: https://github.com/apache/kafka/pull/7868
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Fix For: 2.5.0
>
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal-breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows. Adding the use case from KAFKA-8994 as it is more 
> descriptive.
> "Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2020-01-18 Thread highluck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

highluck reassigned KAFKA-7658:
---

Assignee: highluck  (was: Aishwarya Pradeep Kumar)

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>  Labels: kip, newbie
>
> KIP-523: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL]
>  
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code:java}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference:
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018602#comment-17018602
 ] 

Ted Yu commented on KAFKA-9450:
---

w.r.t. separate column family, since the data in this family tends to be small 
compared to the data family, wouldn't we end up with small files similar to 
rocksdb memtable flush ?

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9424) Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer

2020-01-18 Thread Steven Lu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Lu updated KAFKA-9424:
-
Reviewer: Rajini Sivaram  (was: Manikumar)

> Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer
> --
>
> Key: KAFKA-9424
> URL: https://issues.apache.org/jira/browse/KAFKA-9424
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, tools
>Affects Versions: 0.10.2.0, 2.4.0, 2.3.1
> Environment: Linux,JDK7+
>Reporter: Steven Lu
>Priority: Major
>  Labels: Solved
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> In the class Named AclCommand,configure SimpleAclAuthorizer,but no need call 
> loadCache.
> now we have 20,000 topics in kafka cluster,everytime I run AclCommand,all 
> these topics's Alcs need to be authed, it will be very slow.
> The purpose of this optimization is:we can choose to not load the acl of all 
> topics into memory, mainly for adding and deleting permissions.
> PR Available here: [https://github.com/apache/kafka/pull/7706]
> mainly for adding and deleting permissions,we can choose to not load the acl 
> of all topics into memory,then we can add two args "--load-acl-cache" "false" 
> in AclCommand.main;else you don't add these args, it will load the acl cache 
> defaultly.
> we can choose improve the running time from minutes to less than one second.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9424) Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer

2020-01-18 Thread Steven Lu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Lu updated KAFKA-9424:
-
Reviewer: Manikumar  (was: Ismael Juma)

> Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer
> --
>
> Key: KAFKA-9424
> URL: https://issues.apache.org/jira/browse/KAFKA-9424
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, tools
>Affects Versions: 0.10.2.0, 2.4.0, 2.3.1
> Environment: Linux,JDK7+
>Reporter: Steven Lu
>Priority: Major
>  Labels: Solved
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> In the class Named AclCommand,configure SimpleAclAuthorizer,but no need call 
> loadCache.
> now we have 20,000 topics in kafka cluster,everytime I run AclCommand,all 
> these topics's Alcs need to be authed, it will be very slow.
> The purpose of this optimization is:we can choose to not load the acl of all 
> topics into memory, mainly for adding and deleting permissions.
> PR Available here: [https://github.com/apache/kafka/pull/7706]
> mainly for adding and deleting permissions,we can choose to not load the acl 
> of all topics into memory,then we can add two args "--load-acl-cache" "false" 
> in AclCommand.main;else you don't add these args, it will load the acl cache 
> defaultly.
> we can choose improve the running time from minutes to less than one second.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9452) Add new cached authorizer:change the dim of cache

2020-01-18 Thread Steven Lu (Jira)
Steven Lu created KAFKA-9452:


 Summary: Add new cached authorizer:change the dim of cache
 Key: KAFKA-9452
 URL: https://issues.apache.org/jira/browse/KAFKA-9452
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Steven Lu


Same like issues https://issues.apache.org/jira/browse/KAFKA-5261 ,

We met the same performance issue which is descripted in the pr 
[#3756|https://github.com/apache/kafka/pull/3756] in our production 
environment,hence, we make a revision for the mechamisum of authorization, our 
revision have such optimizations

1、Build a cache for authorization, which can avoid recomputation of 
authorization result. The authorization result will fetch on the result catch 
if the same result has been computed rather than compute it again
2、Differ from the pr 3756, when we build the result cache of the authorization, 
we take the resource into first consideration. In this way, the authorization 
is recomputed only when the authorization are change of specific resource. 
Compared to the the frequency of recomputation can be reduced obviously.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)