Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-29 Thread John Roesler
Hi Navinder, 

Thanks for the ping. Yes, that all sounds right to me. The name 
“RESTORING_GLOBAL” sounds fine, too. 

I think as far as warnings go, we’d just propose to mention it in the javadoc 
of the relevant methods that the given topics should be compacted. 

Thanks!
-John

On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
> Gentle ping.
> 
> ~ Navinder
> On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar 
>  wrote:  
>  
>   
> Thanks Matthias & John, 
> 
> 
> 
> I am glad we are converging towards an understanding. So, to summarize, 
> 
> we will still keep treating this change in KIP and instead of providing a 
> reset
> 
> strategy, we will cleanup, and reset to earliest and build the state. 
> 
> When we hit the exception and we are building the state, we will stop all 
> 
> processing and change the state of KafkaStreams to something like 
> 
> “RESTORING_GLOBAL” or the like. 
> 
> 
> 
> How do we plan to educate users on the non desired effects of using 
> 
> non-compacted global topics? (via the KIP itself?)
> 
> 
> +1 on changing the KTable behavior, reset policy for global, connecting 
> processors to global for a later stage when demanded.
> 
> Regards,
> Navinder
>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax 
>  wrote:  
>  
>  Your observation is correct. Connecting (regular) stores to processors
> is necessary to "merge" sub-topologies into single ones if a store is
> shared. -- For global stores, the structure of the program does not
> change and thus connecting srocessors to global stores is not required.
> 
> Also given our experience with restoring regular state stores (ie,
> partial processing of task that don't need restore), it seems better to
> pause processing and move all CPU and network resources to the global
> thread to rebuild the global store as soon as possible instead of
> potentially slowing down the restore in order to make progress on some
> tasks.
> 
> Of course, if we collect real world experience and it becomes an issue,
> we could still try to change it?
> 
> 
> -Matthias
> 
> 
> On 8/18/20 3:31 PM, John Roesler wrote:
> > Thanks Matthias,
> > 
> > Sounds good. I'm on board with no public API change and just
> > recovering instead of crashing.
> > 
> > Also, to be clear, I wouldn't drag KTables into it; I was
> > just trying to wrap my head around the congruity of our
> > choice for GlobalKTable with respect to KTable.
> > 
> > I agree that whatever we decide to do would probably also
> > resolve KAFKA-7380.
> > 
> > Moving on to discuss the behavior change, I'm wondering if
> > we really need to block all the StreamThreads. It seems like
> > we only need to prevent processing on any task that's
> > connected to the GlobalStore. 
> > 
> > I just took a look at the topology building code, and it
> > actually seems that connections to global stores don't need
> > to be declared. That's a bummer, since it means that we
> > really do have to stop all processing while the global
> > thread catches up.
> > 
> > Changing this seems like it'd be out of scope right now, but
> > I bring it up in case I'm wrong and it actually is possible
> > to know which specific tasks need to be synchronized with
> > which global state stores. If we could know that, then we'd
> > only have to block some of the tasks, not all of the
> > threads.
> > 
> > Thanks,
> > -John
> > 
> > 
> > On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
> >> Thanks for the discussion.
> >>
> >> I agree that this KIP is justified in any case -- even if we don't
> >> change public API, as the change in behavior is significant.
> >>
> >> A better documentation for cleanup policy is always good (even if I am
> >> not aware of any concrete complaints atm that users were not aware of
> >> the implications). Of course, for a regular KTable, one can
> >> enable/disable the source-topic-changelog optimization and thus can use
> >> a non-compacted topic for this case, what is quite a difference to
> >> global stores/tables; so maybe it's worth to point out this difference
> >> explicitly.
> >>
> >> As mentioned before, the main purpose of the original Jira was to avoid
> >> the crash situation but to allow for auto-recovering while it was an
> >> open question if it makes sense / would be useful to allow users to
> >> specify a custom reset policy instead of using a hard-coded "earliest"
> >> strategy. -- It seem it's still unclear if it would be useful and thus
> >> it might be best to not add it for now -- we can still add it later if
> >> there are concrete use-cases that need this feature.
> >>
> >> @John: I actually agree that it's also questionable to allow a custom
> >> reset policy for KTables... Not sure if we want to drag this question
> >> into this KIP though?
> >>
> >> So it seem, we all agree that we actually don't need any public API
> >> changes, but we only want to avoid crashing?
> >>
> >> For this case, to preserve the current behavior that guarant

Request to subscribe to kafka mailing list

2020-08-29 Thread SaiTeja Ramisetty
Regards,
SaiTeja - Data Engineer


Permission to create KIP

2020-08-29 Thread sorin
Hi guys I just wanted to propose an addition to the Consumer API to add 
a new poll method which would also accept a collection of paused 
partitions to automatically do the actions that are needed to resume 
serving messages for the provided partitions. If this is not the way to 
go for making such proposals, please suggest other ways.


Thanks!



Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-29 Thread Unmesh Joshi
>>>Can you repeat your questions about broker leases?

The LeaseStartTimeMs is expected to be the broker's
'System.currentTimeMillis()' at the point of the request. The active
controller will add its lease period to this in order to compute the
LeaseEndTimeMs.

I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a bit
confusing.  Monotonic Clock (System.nanoTime) on the active controller
should be used to track leases.
(For example,
https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
)

Then we will not need LeaseStartTimeMs?
Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active controller
can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
In this case we might just drop LeaseEndTimeMs from the response, as the
broker already knows about the TTL and can send heartbeats at some fraction
of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
by System.nanoTime)

I have a prototype built to demonstrate this as following:
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala

The Kip631Controller itself depends on a Consensus module, to demonstrate
how possible interactions with the consensus module will look like
 (The Consensus can be pluggable really, with an API to allow reading
replicated log upto HighWaterMark)

It has an implementation of LeaseTracker
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
to demonstrate LeaseTracker's interaction with the consensus module.

The implementation has the following aspects:
1. The lease tracking happens only on the active controller (raft leader)
2. Once the lease expires, it needs to propose and commit a FenceBroker
record for that lease.
3. In case of active controller failure, the lease will be tracked by the
newly raft leader. The new raft leader starts the lease timer again, (as
implemented in onBecomingLeader method of
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
)
in effect extending the lease by the time spent in the leader election and
whatever time was elapsed on the old leader.

There are working tests for this implementation here.
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
and an end to end test here
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala

>>'m not sure what you mean by "de-duplication of the broker."  Can you
give a little more context?
Apologies for using the confusing term deduplication. I meant broker id
conflict.
As you can see in the prototype handleRequest of KIP631Controller
,
the duplicate broker id needs to be detected before the BrokerRecord is
submitted to the raft module.
Also as implemented in the prototype, the KIP631Controller is single
threaded, handling requests one at a time. (an example of
https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
)

Thanks,
Unmesh

On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe  wrote:

> On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > Hi Colin,
> >
> > There were a few of questions I had..
>
> Hi Unmesh,
>
> Thanks for the response.
>
> >
> > 1. Were my comments on the broker lease implementation (and corresponding
> > prototype) appropriate and do we need to change the KIP
> > description accordingly?.
> >
>
> Can you repeat your questions about broker leases?
>
> >
> > 2. How will broker epochs be generated? I am assuming it can be the
> > committed log offset (like zxid?)
> >
>
> There isn't any need to use a log offset.  We can just look at an
> in-memory hash table and see what the latest number is, and add one, to
> generate a new broker epoch.
>
> >
> > 3. How will producer registration happen? I am assuming it should be
> > similar to broker registration, with a similar way to generate producer
> id.
> >
>
> For the EOS stuff, we will need a few new RPCs to the controller.  I think
> we should do that in a follow-on KIP, though, since this one is already
> pretty big.
>
> >
> > 4. Because we expose Raft log to all the brokers, any de-duplication of
> the
> > broker needs to happen before the requests are proposed to Raft. For this
> > the controller needs to be single threaded, and should do validation
> > against the in-process or pending requests and the final state. I read a
> > mention of this, in the responses in this thread.Will it be useful to
>