[jira] [Created] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-12605:


 Summary: kafka consumer churns through buffer memory iterating 
over records
 Key: KAFKA-12605
 URL: https://issues.apache.org/jira/browse/KAFKA-12605
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.7.0
Reporter: radai rosenblatt


we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph.

 

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-14 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9998:
---

 Summary: KafkaProducer.close(timeout) still may block indefinitely
 Key: KAFKA-9998
 URL: https://issues.apache.org/jira/browse/KAFKA-9998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1
Reporter: radai rosenblatt


looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);

// this will keep track of the first encountered exception
AtomicReference firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);< GRACEFUL JOIN
} catch (InterruptedException t) {
firstException.compareAndSet(null, new 
InterruptException(t));
log.error("Interrupted while joining ioThread", t);
}
}
}
}

if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests 
could not be completed " +
"within timeout {} ms.", timeoutMs);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
try {
this.ioThread.join();   <- UNBOUNDED JOIN
} catch (InterruptedException e) {
firstException.compareAndSet(null, new InterruptException(e));
}
}
}
...
}

{code}
specifically in our case the ioThread was running a (very) long running 
user-provided callback which was preventing the producer from closing within 
the given timeout.

 

I think the 2nd join() call should either be _VERY_ short (since we're already 
past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9855) dont waste memory allocating Struct and values objects for Schemas with no fields

2020-04-12 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9855:
---

 Summary: dont waste memory allocating Struct and values objects 
for Schemas with no fields
 Key: KAFKA-9855
 URL: https://issues.apache.org/jira/browse/KAFKA-9855
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1, 2.4.0
Reporter: radai rosenblatt
Assignee: radai rosenblatt


at the time of this writing there are 6 schemas in kafka APIs with no fields - 
3 versions each of LIST_GROUPS and API_VERSIONS.

under some workloads this may result in the creation of a lot of Struct objects 
with an Object[0] for values when deserializing those requests from the wire.

in one particular heap dump we've found a significant amount of heap space 
wasted on creating such objects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: CompletableFuture?

2020-01-17 Thread radai
I'm currently doing my own completableFutures by using the callbacks.
While i wont have the time to do this myself, I still think its a
great idea and would prevent more people from having to do what I'm
currently doing.

On Wed, Jan 15, 2020 at 6:57 AM Vamsi Subahsh  wrote:
>
> Hi,
>
> I'm interested in picking this up as I have already worked on internal code
> bases to make a wrapper on current Api to make it expose CompletableFuture
> (using callbacks in the current api).
>
> Could you give me comment/edit access to the confluence doc, I can write up
> the new api and the logic proposal?
>
> Regards,
> Vamsi Subhash
>
>
>
> On Wed, 15 Jan 2020 at 19:59, Ismael Juma  wrote:
>
> > Good question. I have a draft KIP for the producer change:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send
> >
> > I was still debating which was the best path forward (i.e. what should be
> > in rejected alternatives versus the actual proposal). Feedback is welcome.
> > You're also welcome to take over the KIP if you have the cycles and
> > interest.
> >
> > Ismael
> >
> > On Tue, Jan 14, 2020 at 8:27 PM radai  wrote:
> >
> > > Hi
> > >
> > > With kip-118 (Drop Support for Java 7) officially done, is there a
> > > timeline replacing usage of "plain" Futures with java 8
> > > CompletableFutures?
> > >
> > > kafka 2.0 was mentioned at some point as a possible target for this ...
> > >
> >


CompletableFuture?

2020-01-14 Thread radai
Hi

With kip-118 (Drop Support for Java 7) officially done, is there a
timeline replacing usage of "plain" Futures with java 8
CompletableFutures?

kafka 2.0 was mentioned at some point as a possible target for this ...


Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-12-13 Thread radai
i also have a PR :-)
https://github.com/apache/kafka/pull/7569

On Thu, Dec 12, 2019 at 9:50 PM Gwen Shapira  wrote:
>
> You got 3 binding votes (Joel, Harsha, Ismael) - the vote passed on Nov 7.
>
> Happy hacking!
>
> On Thu, Dec 12, 2019 at 11:35 AM radai  wrote:
> >
> > so can we call this passed ?
> >
> > On Thu, Nov 7, 2019 at 7:43 AM Satish Duggana  
> > wrote:
> > >
> > > +1 (non-binding)
> > >
> > > On Thu, Nov 7, 2019 at 8:58 PM Ismael Juma  wrote:
> > > >
> > > > +1 (binding)
> > > >
> > > > On Thu, Oct 24, 2019 at 9:33 PM radai  
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I'd like to initiate a vote on KIP-514.
> > > > >
> > > > > links:
> > > > > the kip -
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > > > > the PR - https://github.com/apache/kafka/pull/7569
> > > > >
> > > > > Thank you
> > > > >


Re: [EXTERNAL] Re: [DISCUSS] KIP-280: Enhanced log compaction

2019-12-12 Thread radai
may I suggest that if, under "header" strategy, multiple records are
found with identical header values they are ALL kept?
this would be useful in cases where users send larger payloads than
max record size to kafka and are forced to fragment them - by setting
the same header in all fragments it would become possible to properly
log-compact topics with such fragmented payloads.

On Tue, Nov 26, 2019 at 10:24 PM Senthilnathan Muthusamy
 wrote:
>
> Thanks Jun for confirming!
>
> I have updated the KIP (added recommendation section and special case in 
> handling LEO record for non-offset based compaction strategy). Please review 
> and let me know if you have any other feedback.
>
> Regards,
> Senthil
>
> -Original Message-
> From: Jun Rao 
> Sent: Tuesday, November 26, 2019 4:36 PM
> To: dev 
> Subject: [EXTERNAL] Re: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hi, Senthil,
>
> Sorry for the delay.
>
> 51. It seems that we can just remove the last record from the batch, but 
> keeps the batch during compaction. The batch level metadata is enough to 
> preserve the log end offset.
>
> 53. Yes, your understanding is correct. So we could recommend users to set "
> max.compaction.lag.ms" properly if they care about deletes.
>
> Could you add both to the KIP?
>
> Thanks,
>
> Jun
>
>
> On Tue, Nov 26, 2019 at 5:09 AM Senthilnathan Muthusamy 
>  wrote:
>
> > Hi Gouzhang & Jun,
> >
> > Can one of you please confirm/respond to the below mail so that I will
> > go ahead and update the KIP and proceed.
> >
> > Thanks
> > Senthil
> >
> > - Senthil
> > 
> > From: Senthilnathan Muthusamy 
> > Sent: Wednesday, November 20, 2019 5:04:20 PM
> > To: dev@kafka.apache.org 
> > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > 
> >
> > Hi Gouzhang & Jun,
> >
> > Thanks for the detailed on the scenarios.
> >
> > #51 => thanks for the details Gouzhang with example. Does followers
> > won't be sync'ing LEO as well with leader? If yes, keeping last record
> > always (without compaction for non-offset scenarios) would work and
> > this needed only if the new strategy ends up removing LEO record,
> > right? Also I couldn’t able to retrieve Jason's mail related to
> > creating an empty message... Can you please forward if you have?
> > Wondering how that can solve this particular issue unless creating
> > record for random key that won't conflict with the producer/consumer keys 
> > for that topic/partition.
> >
> > #53 => I see that this can happen for the low produce rate from
> > remaining ineligible for compaction for an unbounded duration where by "
> > delete.retention.ms" triggers that removes the tombstone record. If
> > that's the case (please correct me if I am missing any other
> > scenarios), then we can suggest the Kafka users to have "segment.ms" & "
> > max.compaction.lag.ms" (as compaction won't happen on active segment)
> > to be smaller than the "delete.retention.ms" and that should address
> > this scenario, right?
> >
> > Thanks,
> > Senthil
> >
> > -Original Message-
> > From: Jun Rao 
> > Sent: Wednesday, November 13, 2019 9:31 AM
> > To: dev 
> > Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hi, Seth,
> >
> > 51. The difference is that with the offset compaction strategy, the
> > message corresponding to the last offset is always the winning record
> > and will never be removed. But with the new strategies, it's now
> > possible that the message corresponding to the last offset is a losing
> > record and needs to be removed.
> >
> > 53. Similarly, with the offset compaction strategy, if we see a
> > non-tombstone record after a tombstone record, the non-tombstone
> > record is always the winning one. However, with the new strategies,
> > that non-tombstone record with a larger offset could be a losing
> > record. The question is then how do we retain the tombstone long
> > enough so that we could still recognize that the non-tombstone record 
> > should be ignored.
> >
> > Thanks,
> >
> > Jun
> >
> > -Original Message-
> > From: Guozhang Wang 
> > Sent: Tuesday, November 12, 2019 6:09 PM
> > To: dev 
> > Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hello Senthil,
> >
> > Let me try to re-iterate on Jun's comments with some context here:
> >
> > 51: today with the offset-only compaction strategy, the last record of
> > the log (we call it the log-end-record, whose offset is
> > log-end-offset) would always be preserved and not compacted. This is
> > kinda important for replication since followers reason about the 
> > log-end-offset on the leader.
> > Consider this case: three replicas of a partition, leader 1 and
> > follower 2 and 3.
> >
> > Leader 1 has records a, b, c, d and d is the current last record of
> > the partition, the current log-end-offset is 3 (assuming record a's
> > offset is 0).
> > Follower 2 has replicated a, b, c, d. Log-end-offset is 3 Follower 3
> > has replicated a, b, c but not 

Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-12-12 Thread radai
so can we call this passed ?

On Thu, Nov 7, 2019 at 7:43 AM Satish Duggana  wrote:
>
> +1 (non-binding)
>
> On Thu, Nov 7, 2019 at 8:58 PM Ismael Juma  wrote:
> >
> > +1 (binding)
> >
> > On Thu, Oct 24, 2019 at 9:33 PM radai  wrote:
> >
> > > Hello,
> > >
> > > I'd like to initiate a vote on KIP-514.
> > >
> > > links:
> > > the kip -
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > > the PR - https://github.com/apache/kafka/pull/7569
> > >
> > > Thank you
> > >


Re: Broker Interceptors

2019-12-06 Thread radai
to me this is an "API vs SPI" discussion.
pluggable broker bits should fall on the "SPI" side, where tighter
coupling is the price you pay for power and performance, the target
audience is small (and supposedly smarter), and compatibility breaks
are more common and accepted.

On Fri, Dec 6, 2019 at 8:39 AM Ismael Juma  wrote:
>
> Public API classes can be found here:
>
> https://kafka.apache.org/24/javadoc/overview-summary.html
>
> Everything else is internal.
>
> Ismael
>
> On Fri, Dec 6, 2019 at 8:20 AM Tom Bentley  wrote:
>
> > Hi Ismael,
> >
> > How come? They're public in the clients jar. I'm not doubting you, all I'm
> > really asking is "how should I have known this?"
> >
> > Tom
> >
> > On Fri, Dec 6, 2019 at 4:12 PM Ismael Juma  wrote:
> >
> > > AbstractRequest and AbstractResponse are currently internal classes.
> > >
> > > Ismael
> > >
> > > On Fri, Dec 6, 2019 at 1:56 AM Tom Bentley  wrote:
> > >
> > > > Hi,
> > > >
> > > > Couldn't this be done without exposing broker internals at the slightly
> > > > higher level of AbstractRequest and AbstractResponse? Those classes are
> > > > public. If the observer interface used Java default methods then
> > adding a
> > > > new request type would not break existing implementations. I'm thinking
> > > > something like this:
> > > >
> > > > ```
> > > > public interface RequestObserver {
> > > > default void observeAny(RequestContext context, AbstractRequest
> > > > request) {}
> > > > default void observe(RequestContext context, MetadataRequest
> > > request) {
> > > > observeAny(context, request);
> > > > }
> > > > default void observe(RequestContext context, ProduceRequest
> > request)
> > > {
> > > > observeAny(context, request);
> > > > }
> > > > default void observe(RequestContext context, FetchRequest request)
> > {
> > > > observeAny(context, request);
> > > > }
> > > >...
> > > > ```
> > > >
> > > > And similar for a `ResponseObserver`. Request classes would implement
> > > this
> > > > method
> > > >
> > > > ```
> > > > public abstract void observeForAudit(RequestContext context,
> > > > RequestObserver requestObserver);
> > > > ```
> > > >
> > > > where the implementation would look like this:
> > > >
> > > > ```
> > > > @Override
> > > > public void observe(RequestContext context, RequestObserver
> > > > requestObserver) {
> > > > requestObserver.observe(context, this);
> > > > }
> > > > ```
> > > >
> > > > I think this sufficiently abstracted to allow KafkaApis.handle() and
> > > > sendResponse() to call observe() generically.
> > > >
> > > > Kind regards,
> > > >
> > > > Tom
> > > >
> > > > On Wed, Dec 4, 2019 at 6:59 PM Lincong Li 
> > > wrote:
> > > >
> > > > > Hi Thomas,
> > > > >
> > > > > Thanks for your interest in KIP-388. As Ignacio and Radai have
> > > mentioned,
> > > > > this
> > > > > <
> > > > >
> > > >
> > >
> > https://github.com/linkedin/kafka/commit/a378c8980af16e3c6d3f6550868ac0fd5a58682e
> > > > > >
> > > > > is our (LinkedIn's) implementation of KIP-388. The implementation and
> > > > > deployment of this broker-side observer has been working very well
> > for
> > > us
> > > > > by far. On the other hand, I totally agree with the longer-term
> > > concerns
> > > > > raised by other committers. However we still decided to implement the
> > > KIP
> > > > > idea as a hot fix in order to solve our immediate problem and meet
> > our
> > > > > business requirements.
> > > > >
> > > > > The "Rejected Alternatives for Kafka Audit" section at the end of
> > > KIP-388
> > > > > sheds some lights on the client-side auditor/interceptor/observer
> > > (sorry
> > > > > about the potential confusion caused by these words being used
> > > > > interchangeably).
&

Re: Broker Interceptors

2019-12-03 Thread radai
another note - the current status of the kip 388 reflects the latest
stage of the discussion process, which has devolved into a horrible
case study in over-engineering (reflected in the latest edits to the
kip wiki page) before we gave up. the actual commit represents
something much closer to the original kip-388 proposal.

On Tue, Dec 3, 2019 at 9:15 PM radai  wrote:
>
> yeah, we tried for this a while back (kip 388 -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-388%3A+Add+observer+interface+to+record+request+and+response)
>
> its implemented in our kafka repo (linked to above)
>
> On Tue, Dec 3, 2019 at 8:59 PM Ignacio Solis  wrote:
> >
> > At LinkedIn we run a style of "read-only" interceptor we call an observer.
> > We use this for usage monitoring.
> > https://github.com/linkedin/kafka/commit/a378c8980af16e3c6d3f6550868ac0fd5a58682e
> >
> > There is always a tension between exposing internals, creating stable
> > interfaces and performance.  It's understandable that upstream feels uneasy
> > about this type of change.
> >
> > For us, the observer has become an essential tool, so we're ok if we need
> > to update code.
> >
> > Personally, I would like to argue that a general interceptor framework
> > would have been valuable from the very beginning.  Since one didn't exist,
> > we've had to find one-off solutions for a few different problems.  A few
> > examples that may have fit in there include up/down convert formats,
> > authorizer, quotas, transaction coordinator, idempotent producer, etc.  Ok,
> > ok, I'm overreaching, but you get the idea.  The chain of processes that a
> > message/request goes through are basically interceptors, and we have
> > decided that instead of a generic framework, we prefer to do the one-offs.
> >
> > Nacho
> >
> >
> > On Tue, Dec 3, 2019 at 8:03 AM Ismael Juma  wrote:
> >
> > > The main challenge is doing this without exposing a bunch of internal
> > > classes. I haven't seen a proposal that handles that aspect well so far.
> > >
> > > Ismael
> > >
> > > On Tue, Dec 3, 2019 at 7:21 AM Sönke Liebau
> > >  wrote:
> > >
> > > > Hi Thomas,
> > > >
> > > > I think that idea is worth looking at. As you say, if no interceptor is
> > > > configured then the performance overhead should be negligible. Basically
> > > it
> > > > is then up to the user to decide if he wants tomtake the performance 
> > > > hit.
> > > > We should make sure to think about monitoring capabilities like time
> > > spent
> > > > in the interceptor for records etc.
> > > >
> > > > The most obvious use case I think is server side schema validation, 
> > > > which
> > > > Confluent are also offering as part of their commercial product, but
> > > other
> > > > ideas come to mind as well.
> > > >
> > > > Best regards,
> > > > Sönke
> > > >
> > > > Thomas Aley  schrieb am Di., 3. Dez. 2019, 10:45:
> > > >
> > > > > Hi M. Manna,
> > > > >
> > > > > Thank you for your feedback, any and all thoughts on this are
> > > appreciated
> > > > > from the community.
> > > > >
> > > > > I think it is important to distinguish that there are two parts to
> > > this.
> > > > > One would be a server side interceptor framework and the other would 
> > > > > be
> > > > > the interceptor implementations themselves.
> > > > >
> > > > > The idea would be that the Interceptor framework manifests as a plug
> > > > point
> > > > > in the request/response paths that by itself has negligible 
> > > > > performance
> > > > > impact as without an interceptor registered in the framework it is
> > > > > essentially a no-op. This way the out-the-box behavior of the Kafka
> > > > broker
> > > > > remains essentially unchanged, it is only if the cluster administrator
> > > > > registers an interceptor into the framework that the path of a record
> > > is
> > > > > intercepted. This is much like the already accepted and implemented
> > > > client
> > > > > interceptors - the capability exists and it is an opt-in feature.
> > > > >
> > > > > As with the client interceptors and indeed interception in general, 
> > > > > the
> > > &g

Re: Broker Interceptors

2019-12-03 Thread radai
yeah, we tried for this a while back (kip 388 -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-388%3A+Add+observer+interface+to+record+request+and+response)

its implemented in our kafka repo (linked to above)

On Tue, Dec 3, 2019 at 8:59 PM Ignacio Solis  wrote:
>
> At LinkedIn we run a style of "read-only" interceptor we call an observer.
> We use this for usage monitoring.
> https://github.com/linkedin/kafka/commit/a378c8980af16e3c6d3f6550868ac0fd5a58682e
>
> There is always a tension between exposing internals, creating stable
> interfaces and performance.  It's understandable that upstream feels uneasy
> about this type of change.
>
> For us, the observer has become an essential tool, so we're ok if we need
> to update code.
>
> Personally, I would like to argue that a general interceptor framework
> would have been valuable from the very beginning.  Since one didn't exist,
> we've had to find one-off solutions for a few different problems.  A few
> examples that may have fit in there include up/down convert formats,
> authorizer, quotas, transaction coordinator, idempotent producer, etc.  Ok,
> ok, I'm overreaching, but you get the idea.  The chain of processes that a
> message/request goes through are basically interceptors, and we have
> decided that instead of a generic framework, we prefer to do the one-offs.
>
> Nacho
>
>
> On Tue, Dec 3, 2019 at 8:03 AM Ismael Juma  wrote:
>
> > The main challenge is doing this without exposing a bunch of internal
> > classes. I haven't seen a proposal that handles that aspect well so far.
> >
> > Ismael
> >
> > On Tue, Dec 3, 2019 at 7:21 AM Sönke Liebau
> >  wrote:
> >
> > > Hi Thomas,
> > >
> > > I think that idea is worth looking at. As you say, if no interceptor is
> > > configured then the performance overhead should be negligible. Basically
> > it
> > > is then up to the user to decide if he wants tomtake the performance hit.
> > > We should make sure to think about monitoring capabilities like time
> > spent
> > > in the interceptor for records etc.
> > >
> > > The most obvious use case I think is server side schema validation, which
> > > Confluent are also offering as part of their commercial product, but
> > other
> > > ideas come to mind as well.
> > >
> > > Best regards,
> > > Sönke
> > >
> > > Thomas Aley  schrieb am Di., 3. Dez. 2019, 10:45:
> > >
> > > > Hi M. Manna,
> > > >
> > > > Thank you for your feedback, any and all thoughts on this are
> > appreciated
> > > > from the community.
> > > >
> > > > I think it is important to distinguish that there are two parts to
> > this.
> > > > One would be a server side interceptor framework and the other would be
> > > > the interceptor implementations themselves.
> > > >
> > > > The idea would be that the Interceptor framework manifests as a plug
> > > point
> > > > in the request/response paths that by itself has negligible performance
> > > > impact as without an interceptor registered in the framework it is
> > > > essentially a no-op. This way the out-the-box behavior of the Kafka
> > > broker
> > > > remains essentially unchanged, it is only if the cluster administrator
> > > > registers an interceptor into the framework that the path of a record
> > is
> > > > intercepted. This is much like the already accepted and implemented
> > > client
> > > > interceptors - the capability exists and it is an opt-in feature.
> > > >
> > > > As with the client interceptors and indeed interception in general, the
> > > > interceptor implementations need to be thoughtfully crafted to ensure
> > > > minimal performance impact. Yes the interceptor framework could tap
> > into
> > > > nearly everything but would only be tapping into the subset of APIs
> > that
> > > > the user wishes to intercept for their use case.
> > > >
> > > > Tom Aley
> > > > thomas.a...@ibm.com
> > > >
> > > >
> > > >
> > > > From:   "M. Manna" 
> > > > To: Kafka Users 
> > > > Cc: dev@kafka.apache.org
> > > > Date:   02/12/2019 11:31
> > > > Subject:[EXTERNAL] Re: Broker Interceptors
> > > >
> > > >
> > > >
> > > > Hi Tom,
> > > >
> > > > On Mon, 2 Dec 2019 at 09:41, Thomas Aley  wrote:
> > > >
> > > > > Hi Kafka community,
> > > > >
> > > > > I am hoping to get some feedback and thoughts about broker
> > > interceptors.
> > > > >
> > > > > KIP-42 Added Producer and Consumer interceptors which have provided
> > > > Kafka
> > > > > users the ability to collect client side metrics and trace the path
> > of
> > > > > individual messages end-to-end.
> > > > >
> > > > > This KIP also mentioned "Adding message interceptor on the broker
> > makes
> > > > a
> > > > > lot of sense, and will add more detail to monitoring. However, the
> > > > > proposal is to do it later in a separate KIP".
> > > > >
> > > > > One of the motivations for leading with client interceptors was to
> > gain
> > > > > experience and see how useable they are before tackling the server
> > side
> > > > > implementation which would ultimately "allow us 

Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-11-07 Thread radai
+1 (non binding, and another bump)

On Fri, Oct 25, 2019 at 11:51 AM Harsha Chintalapani  wrote:
>
> +1 (binding)
> -Harsha
>
>
> On Fri, Oct 25 2019 at 11:01 AM,  wrote:
>
> >
> > +1
> >
> > On Thu, Oct 24, 2019 at 9:33 PM radai  wrote:
> >
> > Hello,
> >
> > >
> >
> > I'd like to initiate a vote on KIP-514.
> >
> > >
> >
> > links:
> > the kip -
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > the PR - https://github.com/apache/kafka/pull/7569
> >
> > >
> >
> > Thank you
> >
> > >
> >


Re: [DISCUSS] KIP-514: Add a bounded flush() API to Kafka Producer

2019-10-24 Thread radai
vote thread created.

On Mon, Oct 21, 2019 at 9:58 AM Colin McCabe  wrote:
>
> Hi Radai,
>
> It seems reasonable to me.
>
> best,
> Colin
>
>
> On Mon, Oct 21, 2019, at 09:52, radai wrote:
> > yet another bump.
> >
> > can we please have a vote if there are no objections ?
> >
> > On Wed, Sep 25, 2019 at 1:28 PM radai  wrote:
> > >
> > > bump.
> > >
> > > so if no more concerns, can we move to a vote on this ?
> > >
> > > On Fri, Sep 13, 2019 at 10:05 AM radai  wrote:
> > > >
> > > > we have a lot of processes that need a time-bounded checkpoint logic.
> > > >
> > > > the standard use case is some consume-process-produce logic of the
> > > > following form:
> > > >
> > > > while (alive) {
> > > >data = consumer.poll()
> > > >output = process(data)
> > > >producer.send(output)
> > > >
> > > >if (System.millis() > nextCheckpoint) {
> > > >   nextCheckpoint = System.millis() + 5 minutes;
> > > >   if (!producer.flush(30 seconds) || !consumer.commitSync(30 
> > > > seconds)) {
> > > >  //unable to checkpoint within timeout, die (or at least raise
> > > > some alarm)
> > > >   }
> > > >}
> > > > }
> > > >
> > > > an unbounded flush could cause the consumer to be considered dead and
> > > > send the whole app into rebalance storms, so we'd really love to be
> > > > able to put a time bound on it.
> > > >
> > > > im also fine with rethrowing InterruptedException
> > > >
> > > > On Thu, Sep 12, 2019 at 3:46 PM Colin McCabe  wrote:
> > > > >
> > > > > Hi Radai,
> > > > >
> > > > > Thanks for the KIP.  Sounds interesting.  I assume that if an 
> > > > > InterruptedException were caught, that would be rethrown, rather than 
> > > > > returning false?  It might be good to specify that.  Can you give an 
> > > > > example of how this would be used?
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Thu, Sep 12, 2019, at 15:26, radai wrote:
> > > > > > bump.
> > > > > >
> > > > > > if no one has any comments on this can we initiate a vote?
> > > > > >
> > > > > > On Tue, Sep 3, 2019 at 8:28 AM KUN DU  wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I would like to start discussion on KIP-514 that proposes we add a
> > > > > > > bounded flush() API to producer.
> > > > > > >
> > > > > > > Link to the KIP:
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > > > > > >
> > > > > > > Suggestions and feedback are welcome!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Kun
> > > > > >
> >


[VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-10-24 Thread radai
Hello,

I'd like to initiate a vote on KIP-514.

links:
the kip - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
the PR - https://github.com/apache/kafka/pull/7569

Thank you


Re: [DISCUSS] KIP-514: Add a bounded flush() API to Kafka Producer

2019-10-21 Thread radai
yet another bump.

can we please have a vote if there are no objections ?

On Wed, Sep 25, 2019 at 1:28 PM radai  wrote:
>
> bump.
>
> so if no more concerns, can we move to a vote on this ?
>
> On Fri, Sep 13, 2019 at 10:05 AM radai  wrote:
> >
> > we have a lot of processes that need a time-bounded checkpoint logic.
> >
> > the standard use case is some consume-process-produce logic of the
> > following form:
> >
> > while (alive) {
> >data = consumer.poll()
> >output = process(data)
> >producer.send(output)
> >
> >if (System.millis() > nextCheckpoint) {
> >   nextCheckpoint = System.millis() + 5 minutes;
> >   if (!producer.flush(30 seconds) || !consumer.commitSync(30 seconds)) {
> >  //unable to checkpoint within timeout, die (or at least raise
> > some alarm)
> >   }
> >}
> > }
> >
> > an unbounded flush could cause the consumer to be considered dead and
> > send the whole app into rebalance storms, so we'd really love to be
> > able to put a time bound on it.
> >
> > im also fine with rethrowing InterruptedException
> >
> > On Thu, Sep 12, 2019 at 3:46 PM Colin McCabe  wrote:
> > >
> > > Hi Radai,
> > >
> > > Thanks for the KIP.  Sounds interesting.  I assume that if an 
> > > InterruptedException were caught, that would be rethrown, rather than 
> > > returning false?  It might be good to specify that.  Can you give an 
> > > example of how this would be used?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Thu, Sep 12, 2019, at 15:26, radai wrote:
> > > > bump.
> > > >
> > > > if no one has any comments on this can we initiate a vote?
> > > >
> > > > On Tue, Sep 3, 2019 at 8:28 AM KUN DU  wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I would like to start discussion on KIP-514 that proposes we add a
> > > > > bounded flush() API to producer.
> > > > >
> > > > > Link to the KIP:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > > > >
> > > > > Suggestions and feedback are welcome!
> > > > >
> > > > > Thanks,
> > > > > Kun
> > > >


Re: [DISCUSS] KIP-514: Add a bounded flush() API to Kafka Producer

2019-09-25 Thread radai
bump.

so if no more concerns, can we move to a vote on this ?

On Fri, Sep 13, 2019 at 10:05 AM radai  wrote:
>
> we have a lot of processes that need a time-bounded checkpoint logic.
>
> the standard use case is some consume-process-produce logic of the
> following form:
>
> while (alive) {
>data = consumer.poll()
>output = process(data)
>producer.send(output)
>
>if (System.millis() > nextCheckpoint) {
>   nextCheckpoint = System.millis() + 5 minutes;
>   if (!producer.flush(30 seconds) || !consumer.commitSync(30 seconds)) {
>  //unable to checkpoint within timeout, die (or at least raise
> some alarm)
>   }
>}
> }
>
> an unbounded flush could cause the consumer to be considered dead and
> send the whole app into rebalance storms, so we'd really love to be
> able to put a time bound on it.
>
> im also fine with rethrowing InterruptedException
>
> On Thu, Sep 12, 2019 at 3:46 PM Colin McCabe  wrote:
> >
> > Hi Radai,
> >
> > Thanks for the KIP.  Sounds interesting.  I assume that if an 
> > InterruptedException were caught, that would be rethrown, rather than 
> > returning false?  It might be good to specify that.  Can you give an 
> > example of how this would be used?
> >
> > best,
> > Colin
> >
> >
> > On Thu, Sep 12, 2019, at 15:26, radai wrote:
> > > bump.
> > >
> > > if no one has any comments on this can we initiate a vote?
> > >
> > > On Tue, Sep 3, 2019 at 8:28 AM KUN DU  wrote:
> > > >
> > > > Hi,
> > > >
> > > > I would like to start discussion on KIP-514 that proposes we add a
> > > > bounded flush() API to producer.
> > > >
> > > > Link to the KIP:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > > >
> > > > Suggestions and feedback are welcome!
> > > >
> > > > Thanks,
> > > > Kun
> > >


Re: [DISCUSS] KIP-517: Add consumer metric indicating time between poll calls

2019-09-13 Thread radai
while youre at it another metric that we have found to be useful is %
time spent in user code vs time spent in poll() (so time between poll
calls / time inside poll calls) - the higher the % value the more
indicative of user code being the cause of performance bottlenecks.

On Fri, Sep 13, 2019 at 9:14 AM Kevin Lu  wrote:
>
> Hi All,
>
> Happy Friday! Bumping this. Any thoughts?
>
> Thanks.
>
> Regards,
> Kevin
>
> On Thu, Sep 5, 2019 at 9:35 AM Kevin Lu  wrote:
>
> > Hi All,
> >
> > I'd like to propose a new consumer metric that measures the time between
> > calls to poll() for use in issues related to hitting max.poll.interval.ms
> > due to long processing time.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metric+indicating+time+between+poll+calls
> >
> > Please give it a read and let me know what you think.
> >
> > Thanks!
> >
> > Regards,
> > Kevin
> >


Re: [DISCUSS] KIP-514: Add a bounded flush() API to Kafka Producer

2019-09-13 Thread radai
we have a lot of processes that need a time-bounded checkpoint logic.

the standard use case is some consume-process-produce logic of the
following form:

while (alive) {
   data = consumer.poll()
   output = process(data)
   producer.send(output)

   if (System.millis() > nextCheckpoint) {
  nextCheckpoint = System.millis() + 5 minutes;
  if (!producer.flush(30 seconds) || !consumer.commitSync(30 seconds)) {
 //unable to checkpoint within timeout, die (or at least raise
some alarm)
  }
   }
}

an unbounded flush could cause the consumer to be considered dead and
send the whole app into rebalance storms, so we'd really love to be
able to put a time bound on it.

im also fine with rethrowing InterruptedException

On Thu, Sep 12, 2019 at 3:46 PM Colin McCabe  wrote:
>
> Hi Radai,
>
> Thanks for the KIP.  Sounds interesting.  I assume that if an 
> InterruptedException were caught, that would be rethrown, rather than 
> returning false?  It might be good to specify that.  Can you give an example 
> of how this would be used?
>
> best,
> Colin
>
>
> On Thu, Sep 12, 2019, at 15:26, radai wrote:
> > bump.
> >
> > if no one has any comments on this can we initiate a vote?
> >
> > On Tue, Sep 3, 2019 at 8:28 AM KUN DU  wrote:
> > >
> > > Hi,
> > >
> > > I would like to start discussion on KIP-514 that proposes we add a
> > > bounded flush() API to producer.
> > >
> > > Link to the KIP:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> > >
> > > Suggestions and feedback are welcome!
> > >
> > > Thanks,
> > > Kun
> >


Re: [DISCUSS] KIP-514: Add a bounded flush() API to Kafka Producer

2019-09-12 Thread radai
bump.

if no one has any comments on this can we initiate a vote?

On Tue, Sep 3, 2019 at 8:28 AM KUN DU  wrote:
>
> Hi,
>
> I would like to start discussion on KIP-514 that proposes we add a
> bounded flush() API to producer.
>
> Link to the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
>
> Suggestions and feedback are welcome!
>
> Thanks,
> Kun


[jira] [Created] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2019-05-14 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-8366:
---

 Summary: partitions of topics being deleted show up in the offline 
partitions metric
 Key: KAFKA-8366
 URL: https://issues.apache.org/jira/browse/KAFKA-8366
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


i believe this is a bug
offline partitions is a metric that indicates an error condition - lack of 
kafka availability.
as an artifact of how deletion is implemented the partitions for a topic 
undergoing deletion will show up as offline, which just creates false-positive 
alerts.

if needed, maybe there should exist a separate "partitions to be deleted" 
sensor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2019-01-22 Thread radai
the kip-320 conflict can be resolved by saying that the leader broker
on the destination "stamps" is own local leader epoch on the incoming
msgs - meaning the offsets "transfer" but leader epochs do not.

On Mon, Jan 7, 2019 at 1:38 PM Edoardo Comar  wrote:
>
> Hi,
> I delayed starting the voting thread due to the festive period. I would
> like to start it this week.
> Has anyone any more feedback ?
>
> --
>
> Edoardo Comar
>
> IBM Event Streams
>
>
> Edoardo Comar  wrote on 13/12/2018 17:50:30:
>
> > From: Edoardo Comar 
> > To: dev@kafka.apache.org
> > Date: 13/12/2018 17:50
> > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > Cluster Replication
> >
> > Hi,
> > as we haven't got any more feedback, we'd like to start a vote on
> KIP-391
> > on Monday
> >
> > INVALID URI REMOVED
> >
> u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D391-253A-2BAllow-2BProducing-2Bwith-2BOffsets-2Bfor-2BCluster-2BReplication=DwIFAg=jf_iaSHvJObTbx-
> >
> siA1ZOg=EzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ=hxekG7cvm8Peoyd4oPqvSwRFRuGIyi9Pc_h2GhHbgtw=4SGyJsJAuYWZWADpzAaSEPqzYnde0WRW6XgZ3L4haB4=
> >
> > --
> >
> > Edoardo Comar
> >
> > IBM Event Streams
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> >
> > Edoardo Comar/UK/IBM wrote on 10/12/2018 10:20:06:
> >
> > > From: Edoardo Comar/UK/IBM
> > > To: dev@kafka.apache.org
> > > Date: 10/12/2018 10:20
> > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > Cluster Replication
> > >
> > > (shameless bump) any additional feedback is welcome ... thanks!
> > >
> > > Edoardo Comar  wrote on 27/11/2018 15:35:09:
> > >
> > > > From: Edoardo Comar 
> > > > To: dev@kafka.apache.org
> > > > Date: 27/11/2018 15:35
> > > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > > Cluster Replication
> > > >
> > > > Hi Jason
> > > >
> > > > we envisioned the replicator to replicate the __consumer_offsets
> topic
> > too
> > > > (although without producing-with-offsets to it!).
> > > >
> > > > As there is no client-side implementation yet using the leader
> epoch,
> > > > we could not yet see the impact of writing to the destination
> cluster
> > > > __consumer_offsets records with an invalid leader epoch.
> > > >
> > > > Also, applications might still use external storage mechanism for
> > consumer
> > > > offsets where the leader_epoch is missing.
> > > >
> > > > Perhaps the replicator could - for the __consumer_offsets topic -
> just
> >
> > > > omit the leader_epoch field in the data sent to destination.
> > > >
> > > > What do you think ?
> > > >
> > > >
> > > > Jason Gustafson  wrote on 27/11/2018 00:09:56:
> > > >
> > > > > Another wrinkle to consider is KIP-320. If you are planning to
> > replicate
> > > > > __consumer_offsets directly, then you will have to account for
> > leader
> > > > epoch
> > > > > information which is stored with the committed offsets. But I
> cannot
> >
> > > > think
> > > > > how it would be possible to replicate the leader epoch information
>
> > in
> > > > > messages even if you can preserve offsets.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Mon, Nov 26, 2018 at 1:16 PM Mayuresh Gharat
> > > > 
> > > > > wrote:
> > > > >
> > > > > > Hi Edoardo,
> > > > > >
> > > > > > Thanks a lot for the KIP.
> > > > > >  I have a few questions/suggestions in addition to what Radai
> has
> > > > mentioned
> > > > > > above :
> > > > > >
> > > > > >1. Is this meant only for 1:1 replication, for example one
> > Kafka
> > > > cluster
> > > > > >replicating to other, instead of having multiple Kafka
> clusters
> > > > > > mirroring
> > > > > >into one Kafka cluster?
> > > > > >2. Are we relying on exactly once produce in the replicator?
> If
> >
> > > > not, how
> > &g

Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-11-26 Thread radai
a few questions:

1. how do you handle possible duplications caused by the "special"
producer timing-out/retrying? are you explicitely relying on the
"exactly once" sequencing?
2. what about the combination of log compacted topics + replicator
downtime? by the time the replicator comes back up there might be
"holes" in the source offsets (some msgs might have been compacted
out)? how is that recoverable?
3. similarly, what if you try and fire up replication on a non-empty
source topic? does the kip allow for offsets starting at some
arbitrary X > 0 ? or would this have to be designed from the start.

and lastly, since this KIP seems to be designed fro active-passive
failover (there can be no produce traffic except the replicator)
wouldnt a solution based on seeking to a time offset be more generic?
your producers could checkpoint the last (say log append) timestamp of
records theyve seen, and when restoring in the remote site seek to
those timestamps (which will be metadata in their committed offsets) -
assumming replication takes > 0 time you'd need to handle some dups,
but every kafka consumer setup needs to know how to handle those
anyway.
On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar  wrote:
>
> Hi Stanislav
>
> > > The flag is needed to distinguish a batch with a desired base offset
> of
> > 0,
> > from a regular batch for which offsets need to be generated.
> > If the producer can provide offsets, why not provide a base offset of 0?
>
> a regular batch (for which offsets are generated by the broker on write)
> is sent with a base offset of 0.
> How could you distinguish it from a batch where you *want* the first
> record to be written at offset 0 (i.e. be the first in the partition and
> be rejected if there are records on the log already) ?
> We wanted to avoid a "deep" inspection (and potentially decompression) of
> the records.
>
> For the replicator use case, a single produce request where all the data
> is to be assumed with offset,
> or all without offsets, seems to suffice,
> So we added only a toplevel flag, not a per-topic-partition one.
>
> Thanks for your interest !
> cheers
> Edo
> --
>
> Edoardo Comar
>
> IBM Event Streams
> IBM UK Ltd, Hursley Park, SO21 2JN
>
>
> Stanislav Kozlovski  wrote on 22/11/2018 22:32:42:
>
> > From: Stanislav Kozlovski 
> > To: dev@kafka.apache.org
> > Date: 22/11/2018 22:33
> > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > Cluster Replication
> >
> > Hey Edo & Mickael,
> >
> > > The flag is needed to distinguish a batch with a desired base offset
> of
> > 0,
> > from a regular batch for which offsets need to be generated.
> > If the producer can provide offsets, why not provide a base offset of 0?
> >
> > > (I am reading your post thinking about
> > partitions rather than topics).
> > Yes, I meant partitions. Sorry about that.
> >
> > Thanks for answering my questions :)
> >
> > Best,
> > Stanislav
> >
> > On Thu, Nov 22, 2018 at 5:28 PM Edoardo Comar  wrote:
> >
> > > Hi Stanislav,
> > >
> > > you're right we envision the replicator use case to have a single
> producer
> > > with offsets per partition (I am reading your post thinking about
> > > partitions rather than topics).
> > >
> > > If a regular producer was to send its own records at the same time,
> it's
> > > very likely that the one sending with an offset will fail because of
> > > invalid offsets.
> > > Same if two producers were sending with offsets, likely both would
> then
> > > fail.
> > >
> > > > Does it make sense to *lock* the topic from other producers while
> there
> > > is
> > > > one that uses offsets?
> > >
> > > You could do that with ACL permissions if you wanted, I don't think it
> > > needs to be mandated by changing the broker logic.
> > >
> > >
> > > > Since we are tying the produce-with-offset request to the ACL, do we
> > > need
> > > > the `use_offset` field in the produce request? Maybe we make it
> > > mandatory
> > > > for produce requests with that ACL to have offsets.
> > >
> > > The flag is needed to distinguish a batch with a desired base offset
> of 0,
> > > from a regular batch for which offsets need to be generated.
> > > I would not restrict a principal to only send-with-offsets (by making
> that
> > > mandatory via the ACL).
> > >
> > > Thanks
> > > Edo & Mickael
> > >
> > > --
> > >
> > > Edoardo Comar
> > >
> > > IBM Event Streams
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > >
> > > Stanislav Kozlovski  wrote on 22/11/2018
> 16:17:11:
> > >
> > > > From: Stanislav Kozlovski 
> > > > To: dev@kafka.apache.org
> > > > Date: 22/11/2018 16:17
> > > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for
> > > > Cluster Replication
> > > >
> > > > Hey Edurdo, thanks for the KIP!
> > > >
> > > > I have some questions, apologies if they are naive:
> > > > Is this intended to work for a single producer use case only?
> > > > How would it 

Re: [DISCUSS] KIP-388 Add observer interface to record request and response

2018-11-08 Thread radai
another downside to client instrumentation (beyond the number of
client codebases one would need to cover) is that in a large
environments you'll have a very long tail of applications using older
clients to upgrade - it would be a long and disruptive process (as
opposed to updating broker-side instrumentation)
On Thu, Nov 8, 2018 at 11:04 AM Peter M. Elias  wrote:
>
> I know we have a lot of use cases for this type of functionality at my
> enterprise deployment. I think it's helpful for maintaining reliability of
> the cluster especially and identifying clients that are not properly tuned
> and therefore applying excessive load to the brokers. Additionally, there
> is a bit of a dark spot without something like as currently. For example,
> if a client is not using a consumer group, there is no direct way to query
> the state of the consumer without looking at raw network connections to
> determine the extent of the traffic generated by that particular consumer.
>
> While client instrumentation can certainly help with this currently, given
> that Kafka is intended to be a shared service across a potentially very
> large surface area of clients, central observation of client activity is in
> my opinion an essential feature.
>
> Peter
>
> On Thu, Nov 8, 2018 at 12:13 PM radai  wrote:
>
> > bump.
> >
> > I think the proposed API (Observer) is useful for any sort of
> > multi-tenant environment for chargeback and reporting purposes.
> >
> > if no one wants to comment, can we initiate a vote?
> > On Mon, Nov 5, 2018 at 6:31 PM Lincong Li  wrote:
> > >
> > > Hi everyone. Here
> > > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-388%3A+Add+observer+interface+to+record+request+and+response
> > >
> > > is
> > > my KIP. Any feedback is appreciated.
> > >
> > > Thanks,
> > > Lincong Li
> >


Re: [DISCUSS] KIP-388 Add observer interface to record request and response

2018-11-08 Thread radai
bump.

I think the proposed API (Observer) is useful for any sort of
multi-tenant environment for chargeback and reporting purposes.

if no one wants to comment, can we initiate a vote?
On Mon, Nov 5, 2018 at 6:31 PM Lincong Li  wrote:
>
> Hi everyone. Here
> 
> is
> my KIP. Any feedback is appreciated.
>
> Thanks,
> Lincong Li


[jira] [Resolved] (KAFKA-7475) print the actual cluster bootstrap address on authentication failures

2018-10-11 Thread radai rosenblatt (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-7475.
-
   Resolution: Fixed
 Reviewer: Rajini Sivaram
Fix Version/s: 2.1.0

> print the actual cluster bootstrap address on authentication failures
> -
>
> Key: KAFKA-7475
> URL: https://issues.apache.org/jira/browse/KAFKA-7475
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.1.0
>
>
> currently when a kafka client fails to connect to a cluster, users see 
> something like this:
> {code}
> Connection to node -1 terminated during authentication. This may indicate 
> that authentication failed due to invalid credentials. 
> {code}
> that log line is mostly useless in identifying which (of potentially many) 
> kafka client is having issues and what kafka cluster is it having issues with.
> would be nice to record the remote host/port



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7475) print the actual cluster bootstrap address on authentication failures

2018-10-02 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-7475:
---

 Summary: print the actual cluster bootstrap address on 
authentication failures
 Key: KAFKA-7475
 URL: https://issues.apache.org/jira/browse/KAFKA-7475
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


currently when a kafka client fails to connect to a cluster, users see 
something like this:
{code}
Connection to node -1 terminated during authentication. This may indicate that 
authentication failed due to invalid credentials. 
{code}

that log line is mostly useless in identifying which (of potentially many) 
kafka client is having issues and what kafka cluster is it having issues with.

would be nice to record the remote host/port



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7473) allow configuring kafka client configs to not warn for unknown config peoperties

2018-10-02 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-7473:
---

 Summary: allow configuring kafka client configs to not warn for 
unknown config peoperties
 Key: KAFKA-7473
 URL: https://issues.apache.org/jira/browse/KAFKA-7473
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


as the config handed to a client may contain config keys for use by either 
modular code in the client (serializers, deserializers, interceptors) or the 
subclasses of the client class, having "unknown" (to the vanilla client) 
configs logged as a warning is an annoyance.

it would be nice to have a constructor parameter that controls this behavior 
(just like there's already a flag for `boolean doLog`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6648:
---

 Summary: Fetcher.getTopicMetadata() only returns "healthy" 
partitions, not all
 Key: KAFKA-6648
 URL: https://issues.apache.org/jira/browse/KAFKA-6648
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.1
Reporter: radai rosenblatt
Assignee: radai rosenblatt


{code}
if (!shouldRetry) {
   HashMap<String, List> topicsPartitionInfos = new HashMap<>();
   for (String topic : cluster.topics())
  topicsPartitionInfos.put(topic, 
cluster.availablePartitionsForTopic(topic));
   return topicsPartitionInfos;
}
{code}

this leads to inconsistent behavior upstream, for example in 
KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
would be returned, whereas if MD doesnt exist (or has expired) a subset of 
partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6622:
---

 Summary: GroupMetadataManager.loadGroupsAndOffsets decompresses 
record batch needlessly
 Key: KAFKA-6622
 URL: https://issues.apache.org/jira/browse/KAFKA-6622
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt
Assignee: radai rosenblatt
 Attachments: kafka batch iteration funtime.png

when reading records from a consumer offsets batch, the entire batch is 
decompressed multiple times (per record) as part of calling `batch.baseOffset`. 
this is a very expensive operation being called in a loop for no reason:
!kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-6345.
-
Resolution: Fixed

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 1.1.0
>
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2017-12-11 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6345:
---

 Summary: NetworkClient.inFlightRequestCount() is not thread safe, 
causing ConcurrentModificationExceptions when sensors are read
 Key: KAFKA-6345
 URL: https://issues.apache.org/jira/browse/KAFKA-6345
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
Reporter: radai rosenblatt


example stack trace (code is ~0.10.2.*)
{code}
java.util.ConcurrentModificationException: 
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
at 
org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
at 
org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
at 
org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
{code}

looking at latest trunk, the code is still vulnerable:
# NetworkClient.inFlightRequestCount() eventually iterates over 
InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
HashMap
# this will be called from the "requests-in-flight" sensor's measure() method 
(Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
thread reading JMX values
# HashMap in question would also be updated by some client io thread calling 
NetworkClient.doSend() - which calls into InFlightRequests.add())

i guess the only upside is that this exception will always happen on the thread 
reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6216) kafka logs for misconfigured ssl clients are unhelpful

2017-11-15 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6216:
---

 Summary: kafka logs for misconfigured ssl clients are unhelpful
 Key: KAFKA-6216
 URL: https://issues.apache.org/jira/browse/KAFKA-6216
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: radai rosenblatt


if you misconfigure the keystores on an ssl client, you will currently get a 
log full of these:
```
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
```
these are caught and printed as part of the client Selector trying to close the 
channel after having caught an IOException (lets call that the root issue).

the root issue itself is only logged at debug, which is not on 99% of the time, 
leaving users with very litle clues as to whats gone wrong.

after turning on debug log, the root issue clearly indicated what the problem 
was in our case:
```
javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:894)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1347)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:270)
... 7 more
Caused by: sun.security.validator.ValidatorException: No trusted certificate 
found
at 
sun.security.validator.SimpleValidator.buildTrustedChain(SimpleValidator.java:384)
at 
sun.security.validator.SimpleValidator.engineValidate(SimpleValidator.java:133)
at sun.security.validator.Validator.validate(Validator.java:260

[GitHub] kafka pull request #4223: when closing a socket in response to an IOExceptio...

2017-11-15 Thread radai-rosenblatt
GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/4223

when closing a socket in response to an IOException, also print the root 
issue if closing fails

`Selector.pollSelectionKeys()` attempts to close the channel in response to 
an Exception (lets call this exception the root issue).

if the root issue itself is an IOException, its printed to log at debug 
level (which is usually off for production users):

```java
 catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
   log.debug("Connection with {} disconnected", desc, e);   <- does 
not appear in real-life log
else if (e instanceof AuthenticationException) // will be logged later 
as error by clients
   log.debug("Connection with {} disconnected due to authentication 
exception", desc, e);
else
   log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true); 
}
```

for some cases, close itself would throw an exception. this exception is 
printed to log as a warning (`Selector.doClose()`):
```java
try {
   channel.close();
} catch (IOException e) {
   log.error("Exception closing connection to node {}:", channel.id(), e);
}
```

this tends to actually show up in user log, looking something like this 
(note - line numbers are from kafka 10.2.*):
```
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
```

this issue spams user's logs and is not really helpful in diagnosing the 
real underlying cause, which (after turning debug logs on) turned out to be 
(for this particular case):
```
javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
at sun.securi

[jira] [Created] (KAFKA-5190) builds with low parallelism exhaust system open files and crash

2017-05-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-5190:
---

 Summary: builds with low parallelism exhaust system open files and 
crash
 Key: KAFKA-5190
 URL: https://issues.apache.org/jira/browse/KAFKA-5190
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt


in an attempt to produce more stable builds, i tried reducing the parallelism:
```
export GRADLE_OPTS=-Dorg.gradle.daemon=false
./gradlew clean build -PmaxParallelForks=1
```
which made things much worse - i now have builds that fail due to hitting the 
system maximum number of open files (4096 in my case).

this happens with or without the gradle daemon



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5189) trunk unstable - DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout fails 90% of times

2017-05-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-5189:
---

 Summary: trunk unstable - 
DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout
 fails 90% of times
 Key: KAFKA-5189
 URL: https://issues.apache.org/jira/browse/KAFKA-5189
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt


ran the build 10 times on my machine, fails 90% of the time



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2025: KAFKA-4293 - improve ByteBufferMessageSet.deepIter...

2017-05-06 Thread radai-rosenblatt
Github user radai-rosenblatt closed the pull request at:

https://github.com/apache/kafka/pull/2025


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2814: change language level from java 7 to 8

2017-04-05 Thread radai-rosenblatt
Github user radai-rosenblatt closed the pull request at:

https://github.com/apache/kafka/pull/2814


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2814: change language level from java 7 to 8

2017-04-05 Thread radai-rosenblatt
GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/2814

change language level from java 7 to 8

now that KIP-118 has passed, and there are no major releases coming before 
0.11 

Signed-off-by: radai-rosenblatt <radai.rosenbl...@gmail.com>

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radai-rosenblatt/kafka java8-ftw

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2814.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2814


commit 0709f9613e9f8f729cf2cc5cf0927ce8ded70def
Author: radai-rosenblatt <radai.rosenbl...@gmail.com>
Date:   2017-04-05T14:17:05Z

change language level from java 7 to 8

Signed-off-by: radai-rosenblatt <radai.rosenbl...@gmail.com>




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-112 - Handle disk failure for JBOD

2017-04-03 Thread radai
+1, LGTM

On Mon, Apr 3, 2017 at 9:49 AM, Dong Lin  wrote:

> Hi all,
>
> It seems that there is no further concern with the KIP-112. We would like
> to start the voting process. The KIP can be found at
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 112%3A+Handle+disk+failure+for+JBOD
>  112%3A+Handle+disk+failure+for+JBOD>.*
>
> Thanks,
> Dong
>


Re: KafkaProducer may send duplicated message sometimes

2017-03-31 Thread radai
removing the timeout completely will cause clients to just "hang" if the
broker is unavailable (even worse, if only one broker out of a cluster goes
down a producer that could have otherwise sent data to other brokers would
fill up it memory with stuff waiting to be sent out to the one thats down).

if youre seeing this issue often, and your brokers are stable, then likely
your timeout values are too short vs your latency/load.

but like i said, as the code is right now this situation is unavoidable
(just unlikely under good configurations and a stable environment) and
client code needs to be written to account for this  - either "dedup"
things you read or make sure read side-effects are idempotent in some other
way.

longer-term when KIP-98 is implemented you could consider switching to it

On Fri, Mar 31, 2017 at 9:28 AM, Yang Cui <y...@freewheel.tv> wrote:

> Hi Radai,
>Thanks for your reply, sincerely, I am so glad for that.
>In my opinion, I am clear that Kafka only provides the “at least once”
> semantics now, but I think Kafka should try it best to decrease the
> duplicated message case if it can. In this case described in my JIRA, I
> think Kafka can remove timeout check code to decrease the possibility of
> duplicating message.
>
>
> On 31/03/2017, 11:17 PM, "radai" <radai.rosenbl...@gmail.com> wrote:
>
> kafka (at least out of the box as it is now) is not an exactly-once
> system.
> its an at-least-once system, meaning the scenario you described (and
> similar ones involving socket disconnections, for example) exist by
> design.
>
> there is a KIP for adding exactly once guarantees (among other things)
> that
> you can read here -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>
>
> On Fri, Mar 31, 2017 at 4:07 AM, Yang Cui <y...@freewheel.tv> wrote:
>
> > Hi All,
> >There is a JIRA issue which I submitted:
> https://issues.apache.org/
> > jira/browse/KAFKA-4951;
> >It is about the Sending duplicated message by KafkaProducer,
> Could you
> > please check it?
> >Thanks.
> >
> > Yang Cui
> >
> >
>
>
>


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-03-31 Thread radai
gt;> >> However, currently the coordinator connection are made
> > > > different
> > > >> by
> > > >> >> >> using:
> > > >> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> > > >> >> >> >> for the Node id.
> > > >> >> >> >>
> > > >> >> >> >> So to identify Coordinator connections, we'd have to check
> > that
> > > >> the
> > > >> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE
> which
> > > > is a
> > > >> >> bit
> > > >> >> >> >> hacky ...
> > > >> >> >> >>
> > > >> >> >> >> Maybe we could add a constructor to Node that allows to
> pass
> > in
> > > > a
> > > >> >> >> >> sourceId String. That way we could make all the coordinator
> > > >> >> >> >> connections explicit (by setting it to "Coordinator-[ID]"
> for
> > > >> >> >> >> example).
> > > >> >> >> >> What do you think ?
> > > >> >> >> >>
> > > >> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> > > >> >> ja...@confluent.io>
> > > >> >> >> >> wrote:
> > > >> >> >> >> > Good point. The consumer does use a separate connection
> to
> > > > the
> > > >> >> >> >> coordinator,
> > > >> >> >> >> > so perhaps the connection itself could be tagged for
> normal
> > > > heap
> > > >> >> >> >> allocation?
> > > >> >> >> >> >
> > > >> >> >> >> > -Jason
> > > >> >> >> >> >
> > > >> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> > > >> >> >> >> onurkaraman.apa...@gmail.com
> > > >> >> >> >> >> wrote:
> > > >> >> >> >> >
> > > >> >> >> >> >> I only did a quick scan but I wanted to point out what I
> > > > think
> > > >> is
> > > >> >> an
> > > >> >> >> >> >> incorrect assumption in the KIP's caveats:
> > > >> >> >> >> >> "
> > > >> >> >> >> >> There is a risk using the MemoryPool that, after we fill
> > up
> > > > the
> > > >> >> >> memory
> > > >> >> >> >> with
> > > >> >> >> >> >> fetch data, we can starve the coordinator's connection
> > > >> >> >> >> >> ...
> > > >> >> >> >> >> To alleviate this issue, only messages larger than 1Kb
> > will
> > > > be
> > > >> >> >> >> allocated in
> > > >> >> >> >> >> the MemoryPool. Smaller messages will be allocated
> > directly
> > > > on
> > > >> the
> > > >> >> >> Heap
> > > >> >> >> >> >> like before. This allows group/heartbeat messages to
> avoid
> > > >> being
> > > >> >> >> >> delayed if
> > > >> >> >> >> >> the MemoryPool fills up.
> > > >> >> >> >> >> "
> > > >> >> >> >> >>
> > > >> >> >> >> >> So it sounds like there's an incorrect assumption that
> > > >> responses
> > > >> >> from
> > > >> >> >> >> the
> > > >> >> >> >> >> coordinator will always be small (< 1Kb as mentioned in
> > the
> > > >> >> caveat).
> > > >> >> >> >> There
> > > >> >> >> >> >> are now a handful of request types between clients and
> the
> > > >> >> >> coordinator:
> > > >> >> >> >> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat,
> > OffsetCommit,
> >

Re: KafkaProducer may send duplicated message sometimes

2017-03-31 Thread radai
kafka (at least out of the box as it is now) is not an exactly-once system.
its an at-least-once system, meaning the scenario you described (and
similar ones involving socket disconnections, for example) exist by design.

there is a KIP for adding exactly once guarantees (among other things) that
you can read here -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging


On Fri, Mar 31, 2017 at 4:07 AM, Yang Cui  wrote:

> Hi All,
>There is a JIRA issue which I submitted: https://issues.apache.org/
> jira/browse/KAFKA-4951;
>It is about the Sending duplicated message by KafkaProducer, Could you
> please check it?
>Thanks.
>
> Yang Cui
>
>


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-03-24 Thread radai
; > >> > > > > > > > > +
> > > >> > > > > > > > > > > > > name;".
> > > >> > > > > > > > > > > > > It would be good if we can keep the same
> > > >> convention
> > > >> > > after
> > > >> > > > > > this
> > > >> > > > > > > > KIP.
> > > >> > > > > > > > > > One
> > > >> > > > > > > > > > > > way
> > > >> > > > > > > > > > > > > to do that is to convert
> > java.security.Principal
> > > >> to
> > > >> > > > > > > > KafkaPrincipal
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > > > logging the requests.
> > > >> > > > > > > > > > > > > --- > This would mean we have to create a
> new
> > > >> > > > > KafkaPrincipal
> > > >> > > > > > on
> > > >> > > > > > > > > each
> > > >> > > > > > > > > > > > > request. Would it be OK to just specify the
> > name
> > > >> of
> > > >> > the
> > > >> > > > > > > > principal.
> > > >> > > > > > > > > > > > > Is there any major reason, we don't want to
> > > change
> > > >> > the
> > > >> > > > > > logging
> > > >> > > > > > > > > > format?
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Mayuresh
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > On Mon, Feb 20, 2017 at 10:18 PM, Jun Rao <
> > > >> > > > > j...@confluent.io>
> > > >> > > > > > > > > wrote:
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Hi, Mayuresh,
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Thanks for the updated KIP. A couple of
> more
> > > >> > > comments.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > 1. Do we convert java.security.Principal
> to
> > > >> > > > > KafkaPrincipal
> > > >> > > > > > > for
> > > >> > > > > > > > > > > > > > authorization check in
> SimpleAclAuthorizer?
> > If
> > > >> so,
> > > >> > it
> > > >> > > > > would
> > > >> > > > > > > be
> > > >> > > > > > > > > > useful
> > > >> > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > mention that in the wiki so that people
> can
> > > >> > > understand
> > > >> > > > > how
> > > >> > > > > > > this
> > > >> > > > > > > > > > > change
> > > >> > > > > > > > > > > > > > doesn't affect the default authorizer
> > > >> > implementation.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > 2. Currently, we log the principal name in
> > the
> > > >> > > request
> > > >> > > > > 

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-13 Thread radai
 methods
> > > > never throw exceptions.  Instead, if you call get() / whenComplete()
> /
> > > > isCompletedExceptionally() / etc. on one of the CompletableFuture
> > > > objects, you will get the exception.  This is to allow Node.js-style
> > > > completion chaining.  I will add this explanation to the KIP.
> > > >
> > > > > I'm particularly interested in what a user can expect if a create
> topics
> > > > > succeeds versus what they can expect if a timeout exception is
> thrown. It
> > > > > would be good to consider partial failures as well.
> > > >
> > > > This is spelled out by KIP-4.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
> > > > Command+line+and+centralized+administrative+operations
> > > >
> > > > Specifically,
> > > >
> > > > > If a timeout error occurs [in CreateTopic], the topic could still
> be
> > > > > created successfully at a later time. Its up to the client to query
> > > > > for the state at that point.
> > > >
> > > > Since we're specifically not changing the server as part of this KIP,
> > > > those semantics will still be in force.  Of course, there are plenty
> of
> > > > other exceptions that you can get from CreateTopics that are more
> > > > meaningful, such as permission-related or network-related ones.  But
> if
> > > > you get a timeout, the operation may or may not have succeeded.
> > > >
> > > > Could we fix the timeout problem?  Sort of.  We could implement
> > > > something like a retry cache.  The brokers would have to maintain a
> > > > cache of operations (and their results) which had succeeded or
> failed.
> > > > Then, if an RPC got interrupted after the server had performed it,
> but
> > > > before the client had received the response message, the client could
> > > > simply reconnect on another TCP session and ask the broker for the
> > > > result of the previous operation.  The broker could look up the
> result
> > > > in the cache and re-send it.
> > > >
> > > > This fix works, but it is very complex.  The cache requires space in
> > > > memory (and to do it perfectly, you also want to persist the cache to
> > > > disk in case the broker restarts and the client re-appears).  The fix
> > > > also requires the client to wait for an indefinite amount of time for
> > > > the server to come back.  If the client ever "gives up" and just
> throws
> > > > a timeout exception, we are back to not knowing what happened on the
> > > > server.
> > > >
> > > > In any case, I think we should discuss RPC change in a separate
> KIP...
> > > > the scope is already big enough here.  Also, in practice, users have
> > > > workarounds for cases where there are timeouts or failures to
> > > > communicate.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, Mar 3, 2017 at 9:37 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> > > > >
> > > > > > On Fri, Mar 3, 2017, at 06:41, Ismael Juma wrote:
> > > > > > > Hi Colin,
> > > > > > >
> > > > > > > I still need to do a detailed review, but I have a couple of
> > > > > > > comments/questions:
> > > > > > >
> > > > > > > 1. I am not sure about having the options/response classes as
> inner
> > > > > > > classes
> > > > > > > of the interface. It means that file containing the interface
> will be
> > > > > > > huge
> > > > > > > eventually. And the classes are not necessarily related
> either. Why
> > > > not
> > > > > > > use
> > > > > > > a separate package for them?
> > > > > >
> > > > > > Yeah, I think it's reasonable to make these top-level classes
> and put
> > > > > > them in separate files.  We can put them all in
> > > > > > org.apache.kafka.clients.admin.
> > > > > >
> > > > > > >
> > > > > > > 2. Can you elaborate on how one decides one goes in the
> Options class
> > > > > &

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-13 Thread radai
the common "stack" we envision at linkedin would consist of (at least) the
following components that add headers to every outgoing request:

1. auditing/"lineage" - appends a header containing "node" (hostname etc),
time (UTC time) and destination (cluster/topic). these accumulate as
requests get mirrored between clusters
2. serialization - sets a header containing a schema identifier to allow
deserialization
3. client-side encryption - would probably set a header identifying the
key/scheme used
4. internal "billing"

there are also several other teams at linkedin that would use headers
(although its unclear yet if via interceptors or by directly manipulating
requests)

if headers are made completely immutable (as the entire request object
currently is) we would end up copying (parts of) every msg 4 times. I
havent benchmarked but this seems like it would have an impact to me.

looking elsewhere rabbitMQ and http components both use mutable request
objects (rabbitMW's BasicProperties object, http components' addHeader
method).

how common is it right now for instances of ProducerRecord to actually be
reused?
do people really have things like publis static final ProducerRecord
MY_FAVORITE_REQUEST = ... ?


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-07 Thread radai
just to clarify - ListIterator is a nice API, and doesnt constrain the
implementation a lot more than Iterator (especially if we implement
previous() very inefficiently :-) ), but changing
Iterable headers(String key)
into
ListIterator headers(String key)
would lose us the ability to easily write what i think is the most common
case - a for each loop:

for (Header stop : headers("lineage")) {
   //examine stop
}

On Tue, Mar 7, 2017 at 12:31 PM, radai <radai.rosenbl...@gmail.com> wrote:

> ing, as you call it, would probably be implemente


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-07 Thread radai
where do you see insert-in-the-middle/replace being commonly used?
lineage tracing, as you call it, would probably be implemented by way of:

1. every "stop" along the way appending itself (at the end)
2. some replication technologies, instead of just doing #1, may clear out
everything when they replicate (starting from a clean slate)



On Mon, Mar 6, 2017 at 11:00 AM, Colin McCabe  wrote:

> As others have mentioned, it seems clear that we want to preserve the
> ordering of message headers, so that we can implement things like
> lineage tracing.  (For example, each stage could add a "lineage:"
> header.)  I also think that we want the ability to add and remove
> headers as needed.  It would be really unfortunate if the only way to
> remove a message header or add a header at a certain position was the
> duplicate the whole message and re-create everything.
>
> So how about implementing ListIterator?
> https://docs.oracle.com/javase/7/docs/api/java/util/ListIterator.html
> It supports adding and removing things at arbitrary positions.  For
> people who want to use it as a simple Iterator, it is one (and you can
> use all the fancy syntax such as Java's foreach with it).  For people
> who want to add and remove things at arbitrary locations, they can.  And
> it doesn't expose the implementation, so that can be changed later.  We
> can materialize things in memory lazily if we want to, and so forth.  I
> think using the standard interface is better than rolling our own
> nonstandard collection or iterator interfaces.
>
> regards,
> Colin
>
>
> On Wed, Mar 1, 2017, at 12:59, Becket Qin wrote:
> > Hi Ismael,
> >
> > Thanks for the reply. Please see the comments inline.
> >
> > On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma  wrote:
> >
> > > Hi Becket,
> > >
> > > Thanks for sharing your thoughts. More inline.
> > >
> > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin 
> wrote:
> > >
> > > > As you can imagine if the ProducerRecord has a value as a List and
> the
> > > > Interceptor.onSend() can actually add an element to the List. If the
> > > > producer.send() is called on the same ProducerRecord again, the value
> > > list
> > > > would have one more element than the previously sent ProducerRecord
> even
> > > > though the ProducerRecord itself is not mutable, right? Same thing
> can
> > > > apply to any modifiable class type.
> > > >
> > >
> > > The difference is that the user chooses the value type. They are free
> to
> > > choose a mutable or immutable type. A generic interceptor cannot
> mutate the
> > > value because it doesn't know the type (and it could be immutable). One
> > > could write an interceptor that checked the type of the value at
> runtime
> > > and did things based on that. But again, the user who creates the
> record is
> > > in control.
> > >
> > But there is no generic interceptor, right? The interceptor always takes
> > specific K, V type.
> >
> >
> > > From this standpoint allowing headers to be mutable doesn't really
> weaken
> > > > the mutability we already have. Admittedly a mutable header is kind
> of
> > > > guiding user towards to change the headers in the existing object
> instead
> > > > of creating a new one.
> > >
> > >
> > > Yes, with headers, we are providing a model for the user (the user
> doesn't
> > > get to choose it like with keys and values) and for the interceptors.
> So, I
> > > think it's not the same.
> >
> >
> > >
> > > > But I think reusing an object while it is possible
> > > > to be modified by user code is a risk that users themselves are
> willing
> > > to
> > > > take. And we do not really protect against that.
> > >
> > >
> > > If users want to take that risk, it's fine. But we give them the
> option to
> > > protect themselves. With mutable headers, there is no option.
> >
> > If we want to let the users control the mutability, users can always call
> > headers.close() before calling producer.send() and that will force the
> > interceptor to create new ProducerRecord object.
> >
> > Because the headers are mostly useful for interceptors, unless the users
> > do
> > not want the interceptors to change their records, it seems reasonable to
> > say that by default modification of headers are allowed for the
> > interceptors.
> >
> > >
> > >
> > > > But there still seems
> > > > value to allow the users to not pay the overhead of creating tons of
> > > > objects if they do not reuse an object to send it twice, which is
> > > probably
> > > > a more common case.
> > > >
> > >
> > > I think the assumption that there would be tons of objects created is
> > > incorrect (I suggested a solution that would only involve one
> additional
> > > reference in the `Header` instance). The usability of the immutable API
> > > seemed to be more of an issue.
> > >
> > If we do not allow the users to add headers on existing ProducerRecord
> > objects, each interceptor who wants to add headers will have to create a
> > 

[GitHub] kafka pull request #2637: throw NoOffsetForPartitionException from poll once...

2017-03-03 Thread radai-rosenblatt
GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/2637

throw NoOffsetForPartitionException from poll once for all TopicPartitions 
affected



Signed-off-by: radai-rosenblatt <radai.rosenbl...@gmail.com>

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radai-rosenblatt/kafka KAFKA-4839

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2637.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2637


commit ee4291644e83fcaf8d3df4de295581562b4c80e4
Author: radai-rosenblatt <radai.rosenbl...@gmail.com>
Date:   2017-03-04T01:47:22Z

throw NoOffsetForPartitionException from poll once for all TopicPartitions 
affected

Signed-off-by: radai-rosenblatt <radai.rosenbl...@gmail.com>




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4839) throw NoOffsetForPartitionException once for all assigned partitions from poll

2017-03-03 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-4839:
---

 Summary: throw NoOffsetForPartitionException once for all assigned 
partitions from poll
 Key: KAFKA-4839
 URL: https://issues.apache.org/jira/browse/KAFKA-4839
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.10.2.0
Reporter: radai rosenblatt
Assignee: radai rosenblatt


KafkaConsumer.poll() will currently throw NoOffsetForPartitionException if 
reset strategy is "none" and there are no defined offsets.

problem is the exception will only be thrown for the 1st such partition 
encountered.

since a single consumer can be the owner of thousands of partitions this 
results in a lot of exception that need to be caught and handled.

its possible to throw the exception once for all such TopicPartitions without 
any user-visible API changes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-01 Thread radai
quick comment on the request objects:

i see "abstract class NewTopic" and "class NewTopicWithReplication" and "
NewTopicWithReplicaAssignments"

1. since the result object is called CreateTopicResults should these be
called *Request?
2. this seems like a suboptimal approach to me. imagine we add a
NewTopicWithSecurity, and then we would need
NewTopicWithReplicationAndSecurity? (or any composable "traits"). this wont
really scale. Wouldnt it be better to have a single (rather complicated)
CreateTopicRequest, and use a builder pattern to deal with the compexity
and options? like so:

CreateTopicRequest req =
AdminRequests.newTopic("bob").replicationFactor(2).withPartitionAssignment(1,
"boker7", "broker10").withOption(...).build();

the builder would validate any potentially conflicting options and would
allow piling on the complexity in a more manageable way (note - my code
above intends to demonstrate both a general replication factor and a
specific assignment for a partiocular partition of that topic, which may be
too much freedom).

On Wed, Mar 1, 2017 at 1:58 PM, Colin McCabe  wrote:

> Hi all,
>
> Thanks for commenting, everyone.  Does anyone have more questions or
> comments, or should we vote?  The latest proposal is up at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+
> AdminClient+API+for+Kafka+admin+operations
>
> best,
> Colin
>
>
> On Thu, Feb 16, 2017, at 15:00, Colin McCabe wrote:
> > On Thu, Feb 16, 2017, at 14:11, Dong Lin wrote:
> > > Hey Colin,
> > >
> > > Thanks for the update. I have two comments:
> > >
> > > - I actually think it is simpler and good enough to have per-topic API
> > > instead of batch-of-topic API. This is different from the argument for
> > > batch-of-partition API because, unlike operation on topic, people
> usually
> > > operate on multiple partitions (e.g. seek(), purge()) at a time. Is
> there
> > > performance concern with per-topic API? I am wondering if we should do
> > > per-topic API until there is use-case or performance benefits of
> > > batch-of-topic API.
> >
> > Yes, there is a performance concern with only supporting operations on
> > one topic at a time.  Jay expressed this in some of his earlier emails
> > and some other people did as well.  We have cases in mind for management
> > software where many topics are created at once.
> >
> > >
> > > - Currently we have interface "Consumer" and "Producer". And we also
> have
> > > implementations of these two interfaces as "KafkaConsumer" and
> > > "KafkaProducer". If we follow the same naming pattern, should we have
> > > interface "AdminClient" and the implementation "KafkaAdminClient",
> > > instead
> > > of the other way around?
> >
> > That's a good point.  We should do that for consistency.
> >
> > best,
> > Colin
> >
> > >
> > > Dong
> > >
> > >
> > > On Thu, Feb 16, 2017 at 10:19 AM, Colin McCabe 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > So I think people have made some very good points so far.  There
> seems
> > > > to be agreement that we need to have explicit batch APIs for the
> sake of
> > > > efficiency, so I added that back.
> > > >
> > > > Contexts seem a little more complex than we thought, so I removed
> that
> > > > from the proposal.
> > > >
> > > > I removed the Impl class.  Instead, we now have a KafkaAdminClient
> > > > interface and an AdminClient implementation.  I think this matches
> our
> > > > other code better, as Jay commented.
> > > >
> > > > Each call now has an "Options" object that is passed in.  This will
> > > > allow us to easily add new parameters to the calls without having
> tons
> > > > of function overloads.  Similarly, each call now has a Results
> object,
> > > > which will let us easily extend the results we are returning if
> needed.
> > > >
> > > > Many people made the point that Java 7 Futures are not that useful,
> but
> > > > Java 8 CompletableFutures are.  With CompletableFutures, you can
> chain
> > > > calls, adapt them, join them-- basically all the stuff people are
> doing
> > > > in Node.js and Twisted Python.  Java 7 Futures don't really let you
> do
> > > > anything but poll for a value or block.  So I felt that it was
> better to
> > > > just go with a CompletableFuture-based API.
> > > >
> > > > People also made the point that they would like an easy API for
> waiting
> > > > on complete success of a batch call.  For example, an API that would
> > > > fail if even one topic wasn't created in createTopics.  So I came up
> > > > with Result objects that provide multiple futures that you can wait
> on.
> > > > You can wait on a future that fires when everything is complete, or
> you
> > > > can wait on futures for individual topics being created.
> > > >
> > > > I updated the wiki, so please take a look.  Note that this new API
> > > > requires JDK8.  It seems like JDK8 is coming soon, though, and the
> > > > disadvantages of sticking to Java 7 are pretty big here, I 

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-01 Thread radai
@michael:

i used void because im used to java beans. thinking about it, i dont see
much use for returning false from adding a header: if the headers are in
read-only you should probably thrown an IllegalStateException because lets
face it, 99% of users dont check return values.
returning "this" is probably more useful because it would allow chaining:

Headers.add().add().remove()

On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma  wrote:

> Hi Becket,
>
> Thanks for sharing your thoughts. More inline.
>
> On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin  wrote:
>
> > As you can imagine if the ProducerRecord has a value as a List and the
> > Interceptor.onSend() can actually add an element to the List. If the
> > producer.send() is called on the same ProducerRecord again, the value
> list
> > would have one more element than the previously sent ProducerRecord even
> > though the ProducerRecord itself is not mutable, right? Same thing can
> > apply to any modifiable class type.
> >
>
> The difference is that the user chooses the value type. They are free to
> choose a mutable or immutable type. A generic interceptor cannot mutate the
> value because it doesn't know the type (and it could be immutable). One
> could write an interceptor that checked the type of the value at runtime
> and did things based on that. But again, the user who creates the record is
> in control.
>
> From this standpoint allowing headers to be mutable doesn't really weaken
> > the mutability we already have. Admittedly a mutable header is kind of
> > guiding user towards to change the headers in the existing object instead
> > of creating a new one.
>
>
> Yes, with headers, we are providing a model for the user (the user doesn't
> get to choose it like with keys and values) and for the interceptors. So, I
> think it's not the same.
>
>
> > But I think reusing an object while it is possible
> > to be modified by user code is a risk that users themselves are willing
> to
> > take. And we do not really protect against that.
>
>
> If users want to take that risk, it's fine. But we give them the option to
> protect themselves. With mutable headers, there is no option.
>
>
> > But there still seems
> > value to allow the users to not pay the overhead of creating tons of
> > objects if they do not reuse an object to send it twice, which is
> probably
> > a more common case.
> >
>
> I think the assumption that there would be tons of objects created is
> incorrect (I suggested a solution that would only involve one additional
> reference in the `Header` instance). The usability of the immutable API
> seemed to be more of an issue.
>
> In any case, if we do add the `close()` method, then we need to add a note
> to the compatibility section stating that once a producer record is sent,
> it cannot be sent again as this would cause interceptors that add headers
> to fail.
>
> Ismael
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-28 Thread radai
I will settle for any API really, but just wanted to point out that as it
stands right now the API targets the most "advanced" (hence obscure and
rare) use cases, at the expense of the simple and common ones. i'd suggest
(as the minimal set):

Header header(String key) - returns JUST ONE (the very last) value given a
key
Iterable headers(String key) - returns ALL under a key
Iterable headers() - returns all, period. maybe allow null as key
to prev method instead?
void add(Header header) - appends a header (key inside).
void remove(String key) - removes ALL HEADERS under a key.

this way naive get/set semantics map to header(key)/add(Header) cleanly and
simply while preserving the ability to handle more advanced use cases.
we can always add more convenience methods (like those dealing with lists -
addAll etc) but i think the 5 (potentially 4) above are sufficient for
basically everything.

On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma  wrote:

> Hi Becket,
>
> Comments inline.
>
> On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin  wrote:
> >
> > 1. Regarding the mutability.
> >
> > I think it would be a big convenience to have headers mutable during
> > certain stage in the message life cycle for the use cases you mentioned.
> I
> > agree there is a material benefit especially given that we may have to
> > modify the headers for each message.
> >
> > That said, I also think it is fair to say that in the producer, in order
> to
> > guarantee the correctness of the entire logic, it is necessary that at
> some
> > point we need to make producer record immutable. For example we probably
> > don't want to see that users accidentally updated the headers when the
> > producer is doing the serialization or compression.
> >
> > Given that, would it be possible to make Headers to be able to switch
> from
> > mutable to immutable? We have done this for the Batch in the producer.
> For
> > example, initially the headers are mutable, but after it has gone through
> > all the interceptors, we can call Headers.close() to make it immutable
> > afterwards.
> >
>
> The difference is that the batch is an internal class that is not exposed
> to users. Can you please explain what happens if a user tries to send the
> same ProducerRecord twice? Would an interceptor fail when trying to mutate
> the header that is now closed? Or did you have something else in mind?
>
> Thanks,
> Ismael
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-27 Thread radai
r when needed, e.g. after having batch level interceptors.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Feb 24, 2017 at 3:56 PM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > KIP updated in response to the below comments:
> >
> >> 1. Is the intent of `Headers.filter` to include or exclude the
> > headers
> > > matching the key? Can you add a javadoc to clarify?
> > > 2. The KIP mentions that we will introduce V4 of FetchRequest
> > and V4 of
> > > ProduceRequest. Can you change this to say that the changes
> will
> > > piggyback
> > > onto V3 of ProduceRequest and V4 of FetchRequest which were
> > introduced
> > > in
> > > KIP-98?
> >
> >
> >
> >
> > On 24/02/2017, 23:20, "Michael Pearce" <michael.pea...@ig.com> wrote:
> >
> > We’re trying to make an eco-system for people to be able to use
> > headers, I think we want to ensure some least common features are
> supported
> > and not limited.
> >
> >
> > Some examples we have already.
> >
> > On consume interceptors a security interceptor may need to take the
> > current header, decrypt the data and replace the token with the next
> token
> > for the next processing, in case of a single decryption token being one
> > time use only.
> >
> > On produce it could be the interceptors add some values in the clear
> > from the systems that supply them, but later a security header
> interceptor
> > needs to encrypt some headers, as such needs to replace the current value
> > with new one.
> >
> > I note Radai already requested this in the thread, I assume he has
> > some use case also. S
> >
> > Simple add / remove is a least common feature.
> >
> > Rgds,
> > Mike
> >
> >
> > On 24/02/2017, 23:00, "Jason Gustafson" <ja...@confluent.io> wrote:
> >
> > Hey Michael,
> >
> > I'm not strongly opposed to them; I just don't see a lot of
> > benefit. One
> > thing it would be good to understand is why a consumer
> interceptor
> > would
> > need to add headers and why a producer interceptor would need to
> > remove
> > them. Maybe we only need the common cases?
> >
> > Thanks,
> > Jason
> >
> > On Fri, Feb 24, 2017 at 2:22 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Jason,
> > >
> > > Sorry I thought this was the agreed compromise to provide an
> api
> > that
> > > avoid boiler plate in return for immutabilty.
> > >
> > > If not then mutability will be needed as a goal is to have a
> > single clean
> > > method call to append/remove a header.
> > >
> > > Cheers
> > > Mike
> > >
> > > On 24/02/2017, 22:15, "Jason Gustafson" <ja...@confluent.io>
> > wrote:
> > >
> > > Hey Michael,
> > >
> > > I didn't actually comment on the new methods for
> > ProducerRecord and
> > > ConsumerRecord. If they only save some boilerplate, I'd
> just
> > as well
> > > not
> > > have them.
> > >
> > > Also a couple minor comments:
> > >
> > > 1. Is the intent of `Headers.filter` to include or exclude
> > the headers
> > > matching the key? Can you add a javadoc to clarify?
> > > 2. The KIP mentions that we will introduce V4 of
> > FetchRequest and V4 of
> > > ProduceRequest. Can you change this to say that the changes
> > will
> > > piggyback
> > > onto V3 of ProduceRequest and V4 of FetchRequest which were
> > introduced
> > > in
> > > KIP-98?
> > >
> > > The rest of the KIP looks good to me.
> > >
> > > -Jason
> > >
> > > On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce <
> > > michael.pea...@ig.com>
> > > wrote:
> > >
> > > > I’ve added the methods on the ProducerRecord that will
> > return a n

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread radai
@jun: i wasnt concerned about tying up a request processing thread, but
IIUC the code does still read the entire request out, which might add-up to
a non-negligible amount of memory.

On Thu, Feb 23, 2017 at 11:55 AM, Dong Lin  wrote:

> Hey Rajini,
>
> The current KIP says that the maximum delay will be reduced to window size
> if it is larger than the window size. I have a concern with this:
>
> 1) This essentially means that the user is allowed to exceed their quota
> over a long period of time. Can you provide an upper bound on this
> deviation?
>
> 2) What is the motivation for cap the maximum delay by the window size? I
> am wondering if there is better alternative to address the problem.
>
> 3) It means that the existing metric-related config will have a more
> directly impact on the mechanism of this io-thread-unit-based quota. The
> may be an important change depending on the answer to 1) above. We probably
> need to document this more explicitly.
>
> Dong
>
>
> On Thu, Feb 23, 2017 at 10:56 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Yeah you are right. I thought it wasn't because at LinkedIn it will be
> too
> > much pressure on inGraph to expose those per-clientId metrics so we ended
> > up printing them periodically to local log. Never mind if it is not a
> > general problem.
> >
> > Hey Rajini,
> >
> > - I agree with Jay that we probably don't want to add a new field for
> > every quota ProduceResponse or FetchResponse. Is there any use-case for
> > having separate throttle-time fields for byte-rate-quota and
> > io-thread-unit-quota? You probably need to document this as interface
> > change if you plan to add new field in any request.
> >
> > - I don't think IOThread belongs to quotaType. The existing quota types
> > (i.e. Produce/Fetch/LeaderReplication/FollowerReplication) identify the
> > type of request that are throttled, not the quota mechanism that is
> applied.
> >
> > - If a request is throttled due to this io-thread-unit-based quota, is
> the
> > existing queue-size metric in ClientQuotaManager incremented?
> >
> > - In the interest of providing guide line for admin to decide
> > io-thread-unit-based quota and for user to understand its impact on their
> > traffic, would it be useful to have a metric that shows the overall
> > byte-rate per io-thread-unit? Can we also show this a per-clientId
> metric?
> >
> > Thanks,
> > Dong
> >
> >
> > On Thu, Feb 23, 2017 at 9:25 AM, Jun Rao  wrote:
> >
> >> Hi, Ismael,
> >>
> >> For #3, typically, an admin won't configure more io threads than CPU
> >> cores,
> >> but it's possible for an admin to start with fewer io threads than cores
> >> and grow that later on.
> >>
> >> Hi, Dong,
> >>
> >> I think the throttleTime sensor on the broker tells the admin whether a
> >> user/clentId is throttled or not.
> >>
> >> Hi, Radi,
> >>
> >> The reasoning for delaying the throttled requests on the broker instead
> of
> >> returning an error immediately is that the latter has no way to prevent
> >> the
> >> client from retrying immediately, which will make things worse. The
> >> delaying logic is based off a delay queue. A separate expiration thread
> >> just waits on the next to be expired request. So, it doesn't tie up a
> >> request handler thread.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Thu, Feb 23, 2017 at 9:07 AM, Ismael Juma  wrote:
> >>
> >> > Hi Jay,
> >> >
> >> > Regarding 1, I definitely like the simplicity of keeping a single
> >> throttle
> >> > time field in the response. The downside is that the client metrics
> >> will be
> >> > more coarse grained.
> >> >
> >> > Regarding 3, we have `leader.imbalance.per.broker.percentage` and
> >> > `log.cleaner.min.cleanable.ratio`.
> >> >
> >> > Ismael
> >> >
> >> > On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:
> >> >
> >> > > A few minor comments:
> >> > >
> >> > >1. Isn't it the case that the throttling time response field
> should
> >> > have
> >> > >the total time your request was throttled irrespective of the
> >> quotas
> >> > > that
> >> > >caused that. Limiting it to byte rate quota doesn't make sense,
> >> but I
> >> > > also
> >> > >I don't think we want to end up adding new fields in the response
> >> for
> >> > > every
> >> > >single thing we quota, right?
> >> > >2. I don't think we should make this quota specifically about io
> >> > >threads. Once we introduce these quotas people set them and
> expect
> >> > them
> >> > > to
> >> > >be enforced (and if they aren't it may cause an outage). As a
> >> result
> >> > > they
> >> > >are a bit more sensitive than normal configs, I think. The
> current
> >> > > thread
> >> > >pools seem like something of an implementation detail and not the
> >> > level
> >> > > the
> >> > >user-facing quotas should be involved with. I think it might be
> >> better
> >> > > to
> >> > >make this a general 

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread radai
i dont think time/cpu% are easy to reason about. most user-facing quota
systems i know (especially the commercial ones) focus on things users
understand better - iops and bytes.

as for quotas and "overhead" requests like heartbeats - on the one hand
subjecting them to the quota may cause clients to die out. on the other not
subjecting them to the quota opens the broker up to DOS attacks. how about
giving overhead requests their own quota, separate from "real"
(user-initiated?) requests? slightly more complicated but i think solves
the issue?

how long are requests held in purgatory? wouldnt this, at some point, still
cause resources to be taken? wouldnt it be better (for high enough delay
values) to just return an error to the client (quota exceeded, try again in
3 seconds)?

how would these work across an entire cluster? if these are enforced
independently on every single broker you'd be hitting "monotonous" clients
(who interact with fewer partitions) much harder than clients who operate
across a lot of partitions.

On Thu, Feb 23, 2017 at 8:02 AM, Ismael Juma  wrote:

> Thanks for the KIP, Rajini. This is a welcome improvement and the KIP page
> covers it well. A few comments:
>
> 1. Can you expand a bit on the motivation for throttling requests that fail
> authorization for ClusterAction? Under what scenarios would this help?
>
> 2. I think we should rename `throttle_time_ms` in the new version of
> produce/fetch response to make it clear that it refers to the byte rate
> throttling. Also, it would be good to include the updated schema for the
> responses (we typically try to do that whenever we update protocol APIs).
>
> 3. I think I am OK with using absolute units, but I am not sure about the
> argument why it's better than a percentage. We are comparing request
> threads to CPUs, but they're not the same as increasing the number of
> request threads doesn't necessarily mean that the server can cope with more
> requests. In the example where we double the number of threads, all the
> existing users would still have the same capacity proportionally speaking
> so it seems intuitive to me. One thing that would be helpful, I think, is
> to describe a few scenarios where the setting needs to be adjusted and how
> users would go about doing it.
>
> 4. I think it's worth mentioning that TLS increases the load on the network
> thread significantly and for cases where there is mixed plaintext and TLS
> traffic, the existing byte rate throttling may not do a great job. I think
> it's OK to tackle this in a separate KIP, but worth mentioning the
> limitation.
>
> 5. We mention DoS attacks in the document. It may be worth mentioning that
> this mostly helps with clients that are not malicious. A malicious client
> could generate a large number of connections to counteract the delays that
> this KIP introduces. Kafka has connection limits per IP today, but not per
> user, so a distributed DoS could bypass those. This is not easy to solve at
> the Kafka level since the authentication step required to get the user may
> be costly enough that the brokers will eventually be overwhelmed.
>
> 6. It's unfortunate that the existing byte rate quota configs use
> underscores instead of dots (like every other config) as separators. It's
> reasonable for `io_thread_units` to use the same convention as the byte
> rate configs, but it's not great that we are adding to the inconsistency. I
> don't have any great solutions apart from perhaps accepting the dot
> notation for all these configs as well.
>
> Ismael
>
> On Fri, Feb 17, 2017 at 5:05 PM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have just created KIP-124 to introduce request rate quotas to Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+
> > Request+rate+quotas
> >
> > The proposal is for a simple percentage request handling time quota that
> > can be allocated to **, ** or **. There
> > are a few other suggestions also under "Rejected alternatives". Feedback
> > and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-23 Thread radai
append-only would mean that if (for whatever reason) i want to replace a
header or strip it out i'd need to copy the whole record.



On Wed, Feb 22, 2017 at 5:09 PM, Michael Pearce 
wrote:

> Im happy to compromise to keep it mutable but move to an append style api.
> (as in guava interables concat)
>
> class Headers {
>Headers append(Iterable headers);
> }
>
>
> I don’t think we’d want prepend, this would give the idea of guaranteed
> ordering, when in actual fact we don’t provide that guarantee (.e.g one
> client can put headerA, then headerB, but another could put headerB then
> headerA, this shouldn’t cause issues), Also what if we changed to a hashmap
> for the internal implementation, its just a bucket of entries no ordering.
> I think we just need to provide an api to add/append headers.
>
> This ok? If so ill update KIP to record this.
>
> Cheers
> Mike
>
> On 23/02/2017, 00:37, "Jason Gustafson"  wrote:
>
> The point about usability is fair. It's also reasonable to expect that
> common use cases such as appending headers should be done efficiently.
>
> Perhaps we could compromise with something like this?
>
> class Headers {
>  Headers append(Iterable headers);
>  Headers prepend(Iterable headers);
> }
>
> That retains ease of use while still giving ourselves some flexibility
> in
> the implementation.
>
> -Jason
>
>
> On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce  >
> wrote:
>
> > I wasn’t referring to the headers needing to be copied, im meaning
> the
> > fact we’d be forcing a new producer record to be created, with all
> the
> > contents copied.
> >
> > i.e what will happen is utility method will be created or end up
> being
> > used, which does this, and returns the new ProducerRecord instance.
> >
> > ProducerRecord  addHeader(ProducerRecord record, Header header){
> > Return New ProducerRecord(record.key, record.value,
> record.timestamp…..,
> > record.headers.concat(header))
> > }
> >
> > To me this seems ugly, but will be inevitable if we don’t make adding
> > headers to existing records a simple clean method call.
> >
> >
> >
> > On 22/02/2017, 22:57, "Michael Pearce" 
> wrote:
> >
> > Lazy init can achieve/avoid that.
> >
> > Re the concat, why don’t we implement that inside the Headers
> rather
> > than causing everyone to implement this as adding headers in
> interceptors
> > will be a dominant use case. We want a user friendly API. Having as
> a user
> > having to code this instead of having the headers handle this for me
> seems
> > redundant.
> >
> > On 22/02/2017, 22:34, "Jason Gustafson" 
> wrote:
> >
> > I thought the argument was against creating the extra objects
> > unnecessarily
> > (i.e. if they were not accessed). And note that making the
> Headers
> > immutable doesn't necessarily mean that they need to be
> copied:
> > you can do
> > a trick like Guava's Iterables.concat to add additional
> headers
> > without
> > changing the underlying collections.
> >
> > -Jason
> >
> > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > wrote:
> >
> > > If the argument for not having a map holding the key, value
> > pairs is due
> > > to garbage creation of HashMap entry's, forcing the
> creation of
> > a whole new
> > > producer record to simply add a head, surely is creating
> a-lot
> > more?
> > > 
> > > From: Jason Gustafson 
> > > Sent: Wednesday, February 22, 2017 10:09 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> > >
> > > The current producer interceptor API is this:
> > >
> > > ProducerRecord onSend(ProducerRecord record);
> > >
> > > So adding a header means creating a new ProducerRecord
> with a
> > new header
> > > added to the current headers and returning it. Would that
> not
> > work?
> > >
> > > -Jason
> > >
> > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > > wrote:
> > >
> > > > So how would you have this work if not mutable where
> > interceptors would
> > > > add headers?
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Jason Gustafson 
>

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-19 Thread radai
as previously
> expressed about the
> allocation of maps. An alternative would be to use arrays, i.e.
>
> class ConsumerRecord<K, V> {
> K key;
> V value;
> Header[] headers;
> }
>
> class Header {
>   String key;
>   byte[] value;
> }
>
> This would work nicely with the conventional array format and
> my guess is
> it would obviate the need do any lazy initialization. If we
> use the map as
> is currently documented, then it is possible with either
> representation to
> slice the headers and initialize them lazily. Either way, it
> might be a
> good idea to use a separate object to represent the headers in
> case we need
> to modify it in the future in some way.
>
> (By the way, doesn't it feel a bit odd that we seem to be
> designing a
> feature which is optimized for people not using?)
>
>
> If we can resolve these points, then at least you will get my
> vote.
>
> Thanks,
> Jason
>
> On Sun, Feb 19, 2017 at 7:30 AM, radai <
> radai.rosenbl...@gmail.com> wrote:
>
> > headers dont "leak" into application code. they are useful
> to application
> > code as well.
> > IIUC samze currently has headers "in-V" and would just
> switch over to kafka
> > headers if they exist.
> > im sure plenty of other users of kafka would have a use for
> headers.
> > im pretty sure use cases exist around shuffling data
> into/out-of kafka
> > (kafka connect or equivalent) where metadata from one end
> could copied over
> > to the other (S3, for example uses http headers for
> user-accessible
> > metadata). it will be kafka client code getting/setting
> those headers. not
> > an interceptor.
> >
> > On Fri, Feb 17, 2017 at 1:41 PM, Michael Pearce <
> michael.pea...@ig.com>
> > wrote:
> >
> > > For APM single event tracing, need access to the header at
> the point of
> > > processing on the processing thread.
> > >
> > > As such interceptors will not work/be suitable for these,
> due to the fact
> > > they act on the ConsumerRecords as a batch, before the
> handling thread
> > can
> > > split out and process per message which is the point these
> tools will
> > need
> > > to continue to transaction tracing.
> > >
> > > Like wise tools and infra pieces will need access to the
> message outside
> > > the interceptor realm.
> > >
> > >
> > >
> > > On 17/02/2017, 21:26, "Jason Gustafson" <
> ja...@confluent.io> wrote:
> > >
> > > >
> > > > That’s exactly what we’re doing the headers are a
> slice of bytes,
> > > which
> > > > then gets parsed later if needed, or can be parsed
> right away, the
> > > headers
> > > > is part of the protocol, so can still be validated
> if wanted.
> > > > If you had a header count then you would have to go
> through each
> > > header
> > > > key and value length value to work out how much to
> skip to get to
> > > say the
> > > > value or any future component in the message after
> the headers.
> > > Having it
> > > > as a byte[] with length value makes this a lot
> easier to skip.
> > >
> > >
> > > So the broker will parse the headers and validate
> them. Good. The
> > only
> > > reason remaining that I can see to leave the headers
> as a byte array
> > > is to
> > > make it easier for the client to skip past them. Are
> we sure this is
> > > not
> > > premature optimization? Are there any performance
> results which show
> > > that
> > 

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-19 Thread radai
> > references),
> > > the
> > > > >> > complexity of this
> > > > >> > >> >> > compared to having a simpler
> protocol
> > was
> > > argued and
> > > > >> > agreed it
> > > > >> > >> wasn’t worth
> > > > >> > >> >> > the complexity as all other
> clients
> > in other
> > > languages
> > > > >> > would need to
> > > > >> > >> ensure
> > > > >> > >> >> > theyre using the right var size
> > algorithm, as
> > > there
> > > > is a
> > > > >> > few.
> > > > >> > >> >> >
> > > > >> > >> >> > On point 3)
> > > > >> > >> >> > We did the attributes, optional
> > approach as
> > > originally
> > > > >> > there was
> > > > >> > >> marked
> > > > >> > >> >> > concern that headers would
> cause a
> > message size
> > > > overhead
> > > > >> > for others,
> > > > >> > >> who
> > > > >> > >> >> > don’t want them. As such this
> is the
> > clean
> > > solution to
> > > > >> > achieve that.
> > > > >> > >> If
> > > > >> > >> >> > that no longer holds, and we
> don’t
> > care that we
> > > add
> > > > >> 4bytes
> > > > >> > overhead,
> > > > >> > >> then
> > > > >> > >> >> > im happy to remove.
> > > > >> > >> >> >
> > > > >> > >> >> > I’m personally in favour of
> keeping
> > the message
> > > as
> > > > small
> > > > >> > as possible
> > > > >> > >> so
> > > > >> > >> >> > people don’t get shocks in perf
> and
> > throughputs
> > > dues
> > > > to
> > > > >> > message size,
> > > > >> > >> >> > unless they actively use the
> feature,
> > as such I
> > > do
> > > > >> prefer
> > > > >> > the
> > > > >> > >> attribute bit
> > > > >> > >> >> > wise feature flag approach
> myself.
> > > > >> > >> >> >
> > > > >> > >> >> >
> > > > >> > >> >> >
> > > > >> > >> >> >
> > > > >> > >> >> > On 16/02/2017, 05:40, "Jason
> > Gustafson" <
> > > > >> > ja...@confluent.io> wrote:
> > > > >> > >> >> >
> > > > >> > >> >> > We have proposed a few
> > significant changes
> > > to the
> > > > >> > message format
> > > > >> > >> in
> > > > >> > >> >> > KIP-98
> > > > >> > >> >> > which now seems likely to
> pass
> 

Re: Cannot start up Kafka Server within Java

2017-02-15 Thread radai
kafka publishes a "test" artifact:
http://search.maven.org/#search|ga|1|a%3A%22kafka_2.12%22
if you introduce a test-scoped dep. on it, which in maven would look like:

   org.apache.kafka


   kafka_2.12
   0.10.1.1
   test
   test

you would get access to trait KafkaServerTestHarness, that you can base
your unit tests on (warning - its scala)

On Wed, Feb 15, 2017 at 8:33 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Stefan,
>
> The mailing list suppressed your attachment, so, it's hard to offer any
> advice without the error.  I'd suggest trying to post the error in a GitHub
> Gist, or some similar format.
>
> I dug up this mailing list archive about starting a Kafka server in-process
> that might help:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201608.mbox/%
> 3CCAJTjOQG5MdboQ9ciBrvZNLQzaDAvKhyQeW-mmz-br4bWy32B7A%40mail.gmail.com%3E
>  The whole thread (Automated Testing w/ Kafka Streams) has a few other
> approaches as well that you might be interested in trying.
>
> Mathieu
>
>
> On Wed, Feb 15, 2017 at 8:27 AM, Stefan Kölbl  wrote:
>
> > Dear Kafka dev team!
> >
> >
> >
> > I’m implementing a data stream processor with Apache Kafka for my
> > bachelor’s thesis and right now I’m struggling with trying to get the
> Kafka
> > Server started with Java.
> >
> >
> >
> > A short summary of what I’m doing currently:
> >
> >1. Start ZooKeeper (manually via terminal).
> >2. Start Kafka Server (manually via terminal).
> >3. Create Kafka Topic (manually via terminal, although it would be
> >auto-created by the producer anyway).
> >4. Run Kafka Producer written with the Kafka Clients API in Java.
> >5. Run Kafka Consumer written with the Kafka Clients API in Java.
> >
> >
> >
> > To further automatize the testing process with jUnit, I’d like to be able
> > to start ZooKeeper and the Kafka Server with Java too, as I do with the
> > producer and the consumer.
> >
> > This should be possible, according to some examples I found online, using
> > the Scala source of Kafka (not Kafka Clients!):
> > http://www.programcreek.com/java-api-examples/index.php?
> > api=kafka.server.KafkaServer
> >
> > But whenever I try to create a new KafkaServer object, I get an error
> > (please see attached error.png).
> >
> >
> >
> > Here are my maven dependencies:
> >
> > <*dependency*>
> > <*groupId*>org.apache.kafka
> > <*artifactId*>kafka-clients
> > <*version*>0.10.1.1
> > 
> > <*dependency*>
> > <*groupId*>org.apache.kafka
> > <*artifactId*>kafka-streams
> > <*version*>0.10.1.1
> > 
> > <*dependency*>
> > <*groupId*>org.apache.kafka
> > <*artifactId*>kafka_2.11
> > <*version*>0.10.1.1
> > 
> >
> >
> >
> > I Somehow have the feeling I’m missing something crucial and easy-to-fix.
> > Neither a lot of google searches nor playing around with the parameters
> > (time, threadNamePrefix, kafkaMetricsReporters) provided to the
> KafkaServer
> > constructor could resolve my issue.
> >
> > Could you please help me? I’m stuck and don’t know what to do anymore.
> >
> > Thank you in advance.
> >
> >
> >
> > Best regards,
> >
> > Stefan Kölbl
> >
>


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-15 Thread radai
@jun:
"Currently kafka-acl.sh just creates an ACL path in ZK with the principal
name string" - yes, but not directly. all it actually does it spin-up the
Authorizer and call Authorizer.addAcl() on it.
the vanilla Authorizer goes to ZK.
but generally speaking, users can plug in their own Authorizers (that can
store/load ACLs to/from wherever).

it would be nice if users who customize Authorizers (and PrincipalBuilders)
did not immediately lose the ability to use kafka-acl.sh with their new
Authorizers.

On Wed, Feb 15, 2017 at 5:50 AM, Manikumar <manikumar.re...@gmail.com>
wrote:

> Sorry, I am late to this discussion.
>
> PrincipalBuilder is only used for SSL Protocol.
> For SASL, we use "sasl.kerberos.principal.to.local.rules" config to map
> SASL principal names to short names. To make it consistent,
> Do we also need to pass the SASL full principal name to authorizer ?
> We may need to use PrincipalBuilder for mapping SASL names.
>
> Related JIRA is here:
> https://issues.apache.org/jira/browse/KAFKA-2854
>
>
> On Wed, Feb 15, 2017 at 7:47 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Radai,
> >
> > Currently kafka-acl.sh just creates an ACL path in ZK with the principal
> > name string. The authorizer module in the broker reads the principal name
> > string from the acl path in ZK and creates the expected KafkaPrincipal
> for
> > matching. As you can see, the expected principal is created on the broker
> > side, not by the kafka-acl.sh tool. The broker already has the ability to
> > configure PrincipalBuilder. That's why I am not sure if there is a need
> for
> > kafka-acl.sh to customize PrincipalBuilder.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Feb 13, 2017 at 7:01 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > if i understand correctly, kafka-acls.sh spins up an instance of (the
> > > custom, in our case) Authorizer, and calls things like addAcls(acls:
> > > Set[Acl], resource: Resource) on it, which are defined in the
> interface,
> > > hence expected to be "extensible".
> > >
> > > (side note: if Authorizer and PrincipalBuilder are defined as
> extensible
> > > interfaces, why doesnt class Acl, which is in the signature for
> > Authorizer
> > > calls, use java.security.Principal?)
> > >
> > > we would like to be able to use the standard kafka-acl command line for
> > > defining ACLs even when replacing the vanilla Authorizer and
> > > PrincipalBuilder (even though we have a management UI for these
> > operations
> > > within linkedin) - simply because thats the correct thing to do from an
> > > extensibility point of view.
> > >
> > > On Mon, Feb 13, 2017 at 1:39 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Mayuresh,
> > > >
> > > > I seems to me that there are two common use cases of authorizer. (1)
> > Use
> > > > the default SimpleAuthorizer and the kafka-acl to do authorization.
> (2)
> > > Use
> > > > a customized authorizer and an external tool for authorization. Do
> you
> > > > think there is a use case for a customized authorizer and kafka-acl
> at
> > > the
> > > > same time? If not, it's better not to complicate the kafka-acl api.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Mon, Feb 13, 2017 at 10:35 AM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for the review and comments. Please find the replies inline
> :
> > > > >
> > > > > This is so that in the future, we can extend to types like group.
> > > > > ---> Yep, I did think the same. But since the SocketServer was
> always
> > > > > creating User type, it wasn't actually used. If we go ahead with
> > > changes
> > > > in
> > > > > this KIP, we will give this power of creating different Principal
> > types
> > > > to
> > > > > the PrincipalBuilder (which users can define there own). In that
> way
> > > > Kafka
> > > > > will not have to deal with handling this. So the Principal building
> > and
> > > > > Authorization will be opaque to Kafka which seems like an expected
> > > > > behavior.
> > > > >
> > > > &

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-15 Thread radai
I've trimmed the inline contents as this mail is getting too big for the
apache mailing list software to deliver :-(

1. the important thing for interoperability is for different "interested
parties" (plugins, infra layers/wrappers, user-code) to be able to stick
pieces of metadata onto msgs without getting in each other's way. a common
key scheme (Strings, as of the time of this writing?) is all thats required
for that. it is assumed that the other end interested in any such piece of
metadata knows the encoding, and byte[] provides for the most flexibility.
i believe this is the same logic behind core kafka being byte[]/byte[] -
Strings are more "usable" but bytes are flexible and so were chosen.
Also - core kafka doesnt even do that good of a job on usability of the
payload (example - i have to specify the nop byte[] "decoders" explicitly
in conf), and again sacrificies usability for the sake of performance (no
convenient single-record processing as poll is a batch, lots of obscure
little config details exposing internals of the batching mechanism, etc)

this is also why i really dislike the idea of a "type system" for header
values, it further degrades the usability, adds complexity and will
eventually get in people's way, also, it would be the 2nd/3rd home-group
serialization mechanism in core kafka (counting 2 iterations of the "type
definition DSL")

2. this is an implementation detail, and not even a very "user facing" one?
to the best of my understanding the vote process is on proposed
API/behaviour. also - since we're willing to go with strings just serialize
a 0-sized header blob and IIUC you dont need any optionals anymore.

3. yes, we can :-)

On Tue, Feb 14, 2017 at 11:56 PM, Michael Pearce 
wrote:

> Hi Jay,
>
> 1) There was some initial debate on the value part, as youll note String,
> String headers were discounted early on. The reason for this is flexibility
> and keeping in line with the flexibility of key, value of the message
> object itself. I don’t think it takes away from an ecosystem as each plugin
> will care for their own key, this way ints, booleans , exotic custom binary
> can all be catered for=.
> a. If you really wanted to push for a typed value interface, I wouldn’t
> want just String values supported, but the the primatives plus string and
> also still keeping the ability to have a binary for custom binaries that
> some organisations may have.
> i. I have written this slight alternative here, https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-82+-+Add+Record+Headers+-+Typed
> ii. Essentially the value bytes, has a leading byte overhead.
> 1.  This tells you what type the value is, before reading the rest of the
> bytes, allowing serialisation/deserialization to and from the primitives,
> string and byte[]. This is akin to some other messaging systems.
> 2) We are making it optional, so that for those not wanting headers have 0
> bytes overhead (think of it as a feature flag), I don’t think this is
> complex, especially if comparing to changes proposed in other kips like
> kip-98.
> a. If you really really don’t like this, we can drop it, but it would mean
> buying into 4 bytes extra overhead for users who do not want to use headers.
> 3) In the summary yes, it is at a higher level, but I think this is well
> documented in the proposed changes section.
> a. Added getHeaders method to Producer/Consumer record (that is it)
> b. We’ve also detailed the new Headers class that this method returns that
> encapsulates the headers protocol and logic.
>
> Best,
> Mike
>
> ==Original questions from the vote thread from Jay.==
>
> Couple of things I think we still need to work out:
>
>1. I think we agree about the key, but I think we haven't talked about
>the value yet. I think if our goal is an open ecosystem of these header
>spread across many plugins from many systems we should consider making
> this
>a string as well so it can be printed, set via a UI, set in config, etc.
>Basically encouraging pluggable serialization formats here will lead to
> a
>bit of a tower of babel.
>2. This proposal still includes a pretty big change to our serialization
>and protocol definition layer. Essentially it is introducing an optional
>type, where the format is data dependent. I think this is actually a big
>change though it doesn't seem like it. It means you can no longer
> specify
>this type with our type definition DSL, and likewise it requires custom
>handling in client libs. This isn't a huge thing, since the Record
>definition is custom anyway, but I think this kind of protocol
>inconsistency is very non-desirable and ties you to hand-coding things.
> I
>think the type should instead by [Key Value] in our BNF, where key and
>value are both short strings as used elsewhere. This brings it in line
> with
>the rest of the protocol.
>3. Could we get more specific about the exact Java API change to

Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread radai
+1 from me.

also - a more usable link to the discussion thread:
http://markmail.org/message/x5wlkieexinovsz3

On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
wrote:

> Hi all,
>
> We would like to start the voting process for KIP-82 – Add record headers.
> The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 82+-+Add+Record+Headers
>
> Discussion thread(s) can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/?project=Kafka=KIP-82
>
>
>
> Thanks,
> Mike
>
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-13 Thread radai
1. making the client Closeable/AutoCloseable would allow try (Client = ...)
{} without the need to finally close.

2. a "stream processing unit" (producer + consumer) currently holds 2 open
sockets to every broker it interacts with, because producer and consumer
dont share the network stack. if we use the admin API to auto cleanup on
commit for intermediate pipelines (which is one of our use cases) this
figure goes up to 3 sockets per unit of processing per broker. beyond
becoming a scalability issue this (i think) might also introduce annoying
bugs due to some (but not all) of these connections being down. this is not
an issue of this KIP though.

On Mon, Feb 13, 2017 at 11:51 AM, Colin McCabe  wrote:

> On Sun, Feb 12, 2017, at 09:21, Jay Kreps wrote:
> > Hey Colin,
> >
> > Thanks for the hard work on this. I know going back and forth on APIs is
> > kind of frustrating but we're at the point where these things live long
> > enough and are used by enough people that it is worth the pain. I'm sure
> > it'll come down in the right place eventually. A couple things I've found
> > helped in the past:
> >
> >1. The burden of evidence needs to fall on the complicator. i.e. if
> >person X thinks the api should be async they need to produce a set of
> >common use cases that require this. Otherwise you are perpetually
> >having to
> >think "we might need x". I think it is good to have a rule of "simple
> >until
> >proven insufficient".
> >2. Make sure we frame things for the intended audience. At this point
> >our apis get used by a very broad set of Java engineers. This is a
> >very
> >different audience from our developer mailing list. These people code
> >for a
> >living not necessarily as a passion, and may not understand details of
> >the
> >internals of our system or even basic things like multi-threaded
> >programming. I don't think this means we want to dumb things down, but
> >rather try really hard to make things truly simple when possible.
> >
> > Okay here were a couple of comments:
> >
> >1. Conceptually what is a TopicContext? I think it means something
> >like
> >TopicAdmin? It is not literally context about Topics right? What is
> >the
> >relationship of Contexts to clients? Is there a threadsafety
> >difference?
> >Would be nice to not have to think about this, this is what I mean by
> >"conceptual weight". We introduce a new concept that is a bit nebulous
> >that
> >I have to figure out to use what could be a simple api. I'm sure
> >you've
> >been through this experience before where you have these various
> >objects
> >and you're trying to figure out what they represent (the connection to
> >the
> >server? the information to create a connection? a request session?).
>
> The intention was to provide some grouping of methods, and also a place
> to put request parameters which were often set to defaults rather than
> being explicitly set.  If it seems complex, we can certainly get rid of
> it.
>
> >2. We've tried to avoid the Impl naming convention. In general the
> >rule
> >has been if there is only going to be one implementation you don't
> >need an
> >interface. If there will be multiple, distinguish it from the others.
> >The
> >other clients follow this pattern: Producer, KafkaProducer,
> >MockProducer;
> >Consumer, KafkaConsumer, MockConsumer.
>
> Good point.  Let's change the interface to KafkaAdminClient, and the
> implementation to AdminClient.
>
> >3. We generally don't use setters or getters as a naming convention. I
> >personally think mutating the setting in place seems kind of like late
> >90s
> >Java style. I think it likely has thread-safety issues. i.e. even if
> >it is
> >volatile you may not get the value you just set if there is another
> >thread... I actually really liked what you described as your original
> >idea
> >of having a single parameter object like CreateTopicRequest that holds
> >all
> >these parameters and defaults. This lets you evolve the api with all
> >the
> >various combinations of arguments without overloading insanity. After
> >doing
> >literally tens of thousands of remote APIs at LinkedIn we eventually
> >converged on a rule, which is ultimately every remote api needs a
> >single
> >argument object you can add to over time and it must be batched. Which
> >brings me to my next point...
>
> Just to clarify, volatiles were never a part of the proposal.  I think
> that context objects or request objects should be used by a single
> thread at a time.
>
> I'm not opposed to request objects, but I think they raise all the same
> questions as context objects.  Basically, the thread-safety issues need
> to be spelled out and understood by the user, and the user needs more
> lines of code to make a request. 

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-13 Thread radai
if i understand correctly, kafka-acls.sh spins up an instance of (the
custom, in our case) Authorizer, and calls things like addAcls(acls:
Set[Acl], resource: Resource) on it, which are defined in the interface,
hence expected to be "extensible".

(side note: if Authorizer and PrincipalBuilder are defined as extensible
interfaces, why doesnt class Acl, which is in the signature for Authorizer
calls, use java.security.Principal?)

we would like to be able to use the standard kafka-acl command line for
defining ACLs even when replacing the vanilla Authorizer and
PrincipalBuilder (even though we have a management UI for these operations
within linkedin) - simply because thats the correct thing to do from an
extensibility point of view.

On Mon, Feb 13, 2017 at 1:39 PM, Jun Rao  wrote:

> Hi, Mayuresh,
>
> I seems to me that there are two common use cases of authorizer. (1) Use
> the default SimpleAuthorizer and the kafka-acl to do authorization. (2) Use
> a customized authorizer and an external tool for authorization. Do you
> think there is a use case for a customized authorizer and kafka-acl at the
> same time? If not, it's better not to complicate the kafka-acl api.
>
> Thanks,
>
> Jun
>
>
>
> On Mon, Feb 13, 2017 at 10:35 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Thanks for the review and comments. Please find the replies inline :
> >
> > This is so that in the future, we can extend to types like group.
> > ---> Yep, I did think the same. But since the SocketServer was always
> > creating User type, it wasn't actually used. If we go ahead with changes
> in
> > this KIP, we will give this power of creating different Principal types
> to
> > the PrincipalBuilder (which users can define there own). In that way
> Kafka
> > will not have to deal with handling this. So the Principal building and
> > Authorization will be opaque to Kafka which seems like an expected
> > behavior.
> >
> >
> > Hmm, normally, the configurations you specify for plug-ins refer to those
> > needed to construct the plug-in object. So, it's kind of weird to use
> that
> > to call a method. For example, why can't principalBuilderService.rest.
> url
> > be passed in through the configure() method and the implementation can
> use
> > that to build principal. This way, there is only a single method to
> compute
> > the principal in a consistent way in the broker and in the kafka-acl
> tool.
> > > We can do that as well. But since the rest url is not related to
> the
> > Principal, it seems out of place to me to pass it every time we have to
> > create a Principal. I should replace "principalConfigs" with
> > "principalProperties".
> > I was trying to differentiate the configs/properties that are used to
> > create the PrincipalBuilder class and the Principal/Principals itself.
> >
> >
> > For LinkedIn's use case, do you actually use the kafka-acl tool? My
> > understanding is that LinkedIn does authorization through an external
> tool.
> > > For Linkedin's use case we don't actually use the kafka-acl tool
> > right now. As per the discussion that we had on
> > https://issues.apache.org/jira/browse/KAFKA-4454, we thought that it
> would
> > be good to make kafka-acl tool changes, to make it flexible and we might
> be
> > even able to use it in future.
> >
> > It seems it's simpler if kafka-acl doesn't to need to understand the
> > principal builder. The tool does authorization based on a string name,
> > which is expected to match the principal name. So, I am wondering why the
> > tool needs to know the principal builder.
> > > If we don't make this change, I am not sure how clients/end users
> > will be able to use this tool if they have there own Authorizer that does
> > Authorization based on Principal, that has more information apart from
> name
> > and type.
> >
> > What if we only make the following changes: pass the java principal in
> > session and in
> > SimpleAuthorizer, construct KafkaPrincipal from java principal name. Will
> > that work for LinkedIn?
> > > This can work for Linkedin but as explained above, it does not seem
> > like a complete design from open source point of view.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Thu, Feb 9, 2017 at 11:29 AM, Jun Rao  wrote:
> >
> > > Hi, Mayuresh,
> > >
> > > Thanks for the reply. A few more comments below.
> > >
> > > On Wed, Feb 8, 2017 at 9:14 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the review. Please find the responses inline.
> > > >
> > > > 1. It seems the problem that you are trying to address is that java
> > > > principal returned from KafkaChannel may have additional fields than
> > name
> > > > that are needed during authorization. Have you considered a
> customized
> > > > PrincipleBuilder that extracts all needed fields from java principal
> > and
> > > > squeezes them as a json in the name of the 

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-07 Thread radai
+1.

under ismael's "facet" approach we could always start with
AdminClient.topics() and then pile on more of them later.

On Tue, Feb 7, 2017 at 8:57 AM, Grant Henke  wrote:

> +1 I think its important to focus this KIP discussion on the "patterns" we
> would like to see in the client and a few key methods in order to make
> progress and then iterate from there.
>
> I think we should let Colin drive the APIs he thinks are important since he
> is volunteering to do the work. And then others can propose and add APIs
> from there.
>
> On Tue, Feb 7, 2017 at 10:37 AM, Ismael Juma  wrote:
>
> > Hi all,
> >
> > I think it's good that we have discussed a number of API that would make
> > sense in the AdminClient. Having done that, I think we should now narrow
> > the focus of this KIP to a small set of methods to get us started. Once
> we
> > have an AdminClient in the codebase, we can propose subsequent KIPs to
> > enrich it. I would suggest the following:
> >
> > 1. Let's focus on topic management operations: describe, create, alter
> and
> > delete topics.
> > 2. Let's add an @Unstable annotation to the class and specify in the
> > javadoc that the methods are subject to change (if necessary).
> >
> > Thoughts?
> >
> > Ismael
> >
> > On Fri, Feb 3, 2017 at 6:24 PM, Colin McCabe  wrote:
> >
> > > On Thu, Feb 2, 2017, at 21:45, Becket Qin wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks for the KIP. An admin client is probably a must after we block
> > > > direct access to ZK. Some comments and thoughts below:
> > > >
> > > > 1. Do we have a clear scope for the admin client? It might be worth
> > > > thinking about the entire user experience of the admin client.
> Ideally
> > we
> > > > may want to have a single client to do all the administrative work
> > > > instead
> > > > of having multiple ways to do different things. For example, do we
> want
> > > > to
> > > > add topic configurations change API in the admin client? What about
> > > > partition movements and preferred leader election? Those are also
> > > > administrative tasks which seem reasonable to be integrated into the
> > > > admin
> > > > client.
> > >
> > > Thanks for the comments, Becket!
> > >
> > > I agree that topic configuration change should be in the administrative
> > > client.  I have not thought about partition movement or preferred
> leader
> > > election.  It probably makes sense to put them in the client as well,
> > > but we should probably have a longer discussion about those features
> > > later when someone is ready to implement them ;)
> > >
> > > best,
> > > Colin
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-06 Thread radai
even assuming all consumers use kafka for offset storage, would it be
possible to get this information from a single broker without "reaching
out" to all brokers in a cluster?

On Mon, Feb 6, 2017 at 2:05 PM, Jianbin Wei 
wrote:

> In the specify group information, can we also return information like
> partition assignment for each member, the lag/offset of each
> member/partition?  It would be useful for Ops/Admin regarding the health of
> the consumer group.
>
> Regards,
>
> -- Jianbin
>
> > On Feb 6, 2017, at 13:54, Guozhang Wang  wrote:
> >
> > Some follow-up on 2) / 3) below.
> >
> > On Mon, Feb 6, 2017 at 11:21 AM, Colin McCabe  > wrote:
> >
> >> On Fri, Feb 3, 2017, at 16:25, Guozhang Wang wrote:
> >>> Thanks for the proposal Colin. A few comments below:
> >>
> >> Thanks for taking a look, Guozhang.
> >>
> >>>
> >>> 1. There are a couple of classes that looks new to me but not defined
> >>> anywhere. For example: NewTopic (topic name and configs?), TopicInfo
> (is
> >>> this a wrapper of MetadataResponse.TopicMetadata?), NodeApiVersions,
> >>> GroupOverview.
> >>> Could you provide their class definitions?
> >>
> >> Good point.  I will add them in the KIP.
> >>
> >> NodeApiVersions is at
> >> ./clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
> >>
> >>>
> >>> 2. In Streams, we would like to replace its own `
> >>> org.apache.kafka.streams.processor.internals.StreamsKafkaClient` class
> >>> with
> >>> this new admin client.  One additional request though, is that for
> create
> >>> /
> >>> delete topics, we'd like to use a different "flag" as BLOCKING, which
> >>> means
> >>> the response will not be sent back until the controller has updated its
> >>> own
> >>> metadata cache for the topic, and even STRICT_BLOCKING, which means the
> >>> response will not be sent back until the metadata has been propagated
> to
> >>> the whole cluster.
> >>
> >> Hmm.  It seems like this would require additional RPCs or changes to
> >> existing RPCs on the server.  So we should handle this in a follow-on
> >> KIP, I think.
> >>
> >>
> > I agree for STRICT_BLOCKING, while for BLOCKING, it is already supported
> as
> > of today I think, and Streams' KafkaClient is using that mechanism as
> well.
> >
> >
> >>>
> >>> 3. I'm wondering what's the usage of "public Map >>> Try> getAllGroups()", or rather, would it be more
> >>> useful to get a specific group information given the group id?
> Otherwise
> >>> we
> >>> need to query each one of the coordinator.
> >>
> >> That's a good point.  We should have an API that gets information about
> >> a specific group, by querying only the coordinator for that group.  By
> >> the way, what specific group information should we expose, besides name
> >> and protocolType?
> >>
> >>
> > I think these can all be returned?
> >
> > (groupID, protocolType, generationID, state, members: [memberID,
> > clientHost], leaderID (nullable) )
> >
> >
> >>>
> >>> 4. I'm +1 with Ismael's suggestion for having the AdminClient interface
> >>> with a KafkaAdminClient impl, this at least allows easier mocks for
> unit
> >>> testing.
> >>
> >> Yeah, I agree.  Hopefully that will also make it a little clearer what
> >> the boundary is between the internal functions and classes and the
> >> public API.  I'll update the KIP accordingly.
> >>
> >> thanks,
> >> Colin
> >>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Fri, Feb 3, 2017 at 10:40 AM, Colin McCabe 
> >> wrote:
> >>>
>  On Thu, Feb 2, 2017, at 15:02, Ismael Juma wrote:
> > Hi Colin,
> >
> > Thanks for the KIP, great to make progress on this. I have some
> >> initial
> > comments, will post more later:
> >
> > 1. We have KafkaProducer that implements the Producer interface and
> > KafkaConsumer that implements the Consumer interface. Maybe we could
> >> have
> > KafkaAdminClient that implements the AdminClient interface? Or maybe
> >> just
> > KafkaAdmin. Not sure, some ideas for consideration. Also, I don't
> >> think
> > we
> > should worry about a name clash with the internal AdminClient
> >> written in
> > Scala. That will go away soon enough and choosing a good name for the
> > public class is what matters.
> 
>  Hi Ismael,
> 
>  Thanks for taking a look.
> 
>  I guess my thought process was that users might find it confusing if
> >> the
>  public API and the old private API had the same name.  "What do you
>  mean, I have to upgrade to release X to get AdminClient, I have it
> >> right
>  here?"  I do have a slight preference for the shorter name, though, so
>  if this isn't a worry, we can change it to AdminClient.
> 
> >
> > 2. We should include the proposed package name in the KIP
> > (probably org.apache.kafka.clients.admin?).
> 
>  Good idea.  I will add the package name to 

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-03 Thread radai
+1

On Fri, Feb 3, 2017 at 11:25 AM, Mayuresh Gharat  wrote:

> Hi All,
>
> It seems that there is no further concern with the KIP-111. At this point
> we would like to start the voting process. The KIP can be found at
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67638388
>
> Thanks,
>
> Mayuresh
>


Re: [DISCUS] consuming messages is polling - is there a reason? new KIP for poll?

2017-02-02 Thread radai
kafka relies on the underlying OS' page cache for serving "popular" data.
so "pre-assembling" push batches would move from page cache to heap
storage, which is not as appealing.
also, for trivial cases a lot of consumers read the same thing, which would
make the heap caching even worse.

also - i dont think you need to shorten fetch.max.wait.ms to get lower
delays - you could still configure a relatively long fetch.max.wait.ms and
have the broker answer your poll the minute _any_ messags are available.

On Wed, Feb 1, 2017 at 2:46 AM, Alexander Binzberger <
alexander.binzber...@wingcon.com> wrote:

> ave very few. I don't see how push would cost more CPU time or resources
> on the broker then polling with a lot of consumers very frequently.


Re: [DISCUS] consuming messages is polling - is there a reason? new KIP for poll?

2017-01-31 Thread radai
minimizing the cost of clients is part of what makes kafka scale.
a push model would shift a lot of tracking logic onto the broker.

On Tue, Jan 31, 2017 at 2:47 AM, Alexander Binzberger <
alexander.binzber...@wingcon.com> wrote:

> way it seams like the protocol and the high-level consumer would be
> simplified.
> Clients have a more natural control over the offset and could ack per
> message or per bulk as needed or performance allows.
> Additionally the stream processing path over
>


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-01-27 Thread radai
a few extra points:

1. broker per disk might also incur more client <--> broker sockets:
suppose every producer / consumer "talks" to >1 partition, there's a very
good chance that partitions that were co-located on a single 10-disk broker
would now be split between several single-disk broker processes on the same
machine. hard to put a multiplier on this, but likely >x1. sockets are a
limited resource at the OS level and incur some memory cost (kernel buffers)

2. there's a memory overhead to spinning up a JVM (compiled code and byte
code objects etc). if we assume this overhead is ~300 MB (order of
magnitude, specifics vary) than spinning up 10 JVMs would lose you 3 GB of
RAM. not a ton, but non negligible.

3. there would also be some overhead downstream of kafka in any management
/ monitoring / log aggregation system. likely less than x10 though.

4. (related to above) - added complexity of administration with more
running instances.

is anyone running kafka with anywhere near 100GB heaps? i thought the point
was to rely on kernel page cache to do the disk buffering 

On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin  wrote:

> Hey Colin,
>
> Thanks much for the comment. Please see me comment inline.
>
> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe  wrote:
>
> > On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
> > > Hey Colin,
> > >
> > > Good point! Yeah we have actually considered and tested this solution,
> > > which we call one-broker-per-disk. It would work and should require no
> > > major change in Kafka as compared to this JBOD KIP. So it would be a
> good
> > > short term solution.
> > >
> > > But it has a few drawbacks which makes it less desirable in the long
> > > term.
> > > Assume we have 10 disks on a machine. Here are the problems:
> >
> > Hi Dong,
> >
> > Thanks for the thoughtful reply.
> >
> > >
> > > 1) Our stress test result shows that one-broker-per-disk has 15% lower
> > > throughput
> > >
> > > 2) Controller would need to send 10X as many LeaderAndIsrRequest,
> > > MetadataUpdateRequest and StopReplicaRequest. This increases the burden
> > > on
> > > controller which can be the performance bottleneck.
> >
> > Maybe I'm misunderstanding something, but there would not be 10x as many
> > StopReplicaRequest RPCs, would there?  The other requests would increase
> > 10x, but from a pretty low base, right?  We are not reassigning
> > partitions all the time, I hope (or else we have bigger problems...)
> >
>
> I think the controller will group StopReplicaRequest per broker and send
> only one StopReplicaRequest to a broker during controlled shutdown. Anyway,
> we don't have to worry about this if we agree that other requests will
> increase by 10X. One MetadataRequest to send to each broker in the cluster
> every time there is leadership change. I am not sure this is a real
> problem. But in theory this makes the overhead complexity O(number of
> broker) and may be a concern in the future. Ideally we should avoid it.
>
>
> >
> > >
> > > 3) Less efficient use of physical resource on the machine. The number
> of
> > > socket on each machine will increase by 10X. The number of connection
> > > between any two machine will increase by 100X.
> > >
> > > 4) Less efficient way to management memory and quota.
> > >
> > > 5) Rebalance between disks/brokers on the same machine will less
> > > efficient
> > > and less flexible. Broker has to read data from another broker on the
> > > same
> > > machine via socket. It is also harder to do automatic load balance
> > > between
> > > disks on the same machine in the future.
> > >
> > > I will put this and the explanation in the rejected alternative
> section.
> > > I
> > > have a few questions:
> > >
> > > - Can you explain why this solution can help avoid scalability
> > > bottleneck?
> > > I actually think it will exacerbate the scalability problem due the 2)
> > > above.
> > > - Why can we push more RPC with this solution?
> >
> > To really answer this question we'd have to take a deep dive into the
> > locking of the broker and figure out how effectively it can parallelize
> > truly independent requests.  Almost every multithreaded process is going
> > to have shared state, like shared queues or shared sockets, that is
> > going to make scaling less than linear when you add disks or processors.
> >  (And clearly, another option is to improve that scalability, rather
> > than going multi-process!)
> >
>
> Yeah I also think it is better to improve scalability inside kafka code if
> possible. I am not sure we currently have any scalability issue inside
> Kafka that can not be removed without using multi-process.
>
>
> >
> > > - It is true that a garbage collection in one broker would not affect
> > > others. But that is after every broker only uses 1/10 of the memory.
> Can
> > > we be sure that this will actually help performance?
> >
> > The big question is, how much memory do Kafka brokers use now, and how
> 

Re: [DISCUSS] KIP-111 : Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-01-21 Thread radai
LGTM.

Kafka currently allows setting both a custom PrincipalBuilder and a custom
Authorizer (expected to act on the output of the principal builder) but
makes the naive assumption that any and all information about a (custom)
principal is solely contained in its name property. this kip addresses that.

On Fri, Jan 20, 2017 at 4:15 PM, Mayuresh Gharat  wrote:

> Hi,
>
> Just wanted to see if anyone had any concerns with this KIP.
> I would like to put this to vote soon, if there are no concerns.
>
> Thanks,
>
> Mayuresh
>
> On Thu, Jan 12, 2017 at 11:21 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi Ismael,
> >
> > Fair point. I will update it.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Jan 12, 2017 at 11:07 AM, Ismael Juma  wrote:
> >
> >> Hi Mayuresh,
> >>
> >> Thanks for the KIP. A quick comment before I do a more detailed
> analysis,
> >> the KIP says:
> >>
> >> `This KIP is a pure addition to existing functionality and does not
> >> include
> >> any backward incompatible changes.`
> >>
> >> However, the KIP is proposing the addition of a method to the
> >> PrincipalBuilder pluggable interface, which is not a compatible change.
> >> Existing implementations would no longer compile, for example. It would
> be
> >> good to make this clear in the KIP.
> >>
> >> Ismael
> >>
> >> On Thu, Jan 12, 2017 at 5:44 PM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com
> >> > wrote:
> >>
> >> > Hi all.
> >> >
> >> > We created KIP-111 to propose that Kafka should preserve the Principal
> >> > generated by the PrincipalBuilder while processing the request
> received
> >> on
> >> > socket channel, on the broker.
> >> >
> >> > Please find the KIP wiki in the link
> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?
> >> pageId=67638388.
> >> > We would love to hear your comments and suggestions.
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Mayuresh
> >> >
> >>
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


Re: [VOTE] KIP-74: Add FetchResponse size limit in bytes

2017-01-21 Thread radai
+1

On Fri, Jan 20, 2017 at 9:51 PM, Apurva Mehta  wrote:

> +1
>
> On Fri, Jan 20, 2017 at 5:19 PM, Jason Gustafson 
> wrote:
>
> > +1
> >
> > On Fri, Jan 20, 2017 at 4:51 PM, Ismael Juma  wrote:
> >
> > > Good catch, Colin. +1 to editing the wiki to match the desired
> behaviour
> > > and what was implemented in 0.10.1.
> > >
> > > Ismael
> > >
> > > On Sat, Jan 21, 2017 at 12:19 AM, Colin McCabe 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > While looking at some code related to KIP-74, I noticed a slight
> > > > discrepancy between the text on the wiki and the implementation.  The
> > > > wiki says that "If max_bytes is Int.MAX_INT, new request behaves
> > exactly
> > > > like old one."  This would mean that if there was a single message
> that
> > > > was larger than the maximum bytes per partition, zero messages would
> be
> > > > returned, and clients would throw MessageSizeTooLargeException.
> > > > However, the code does not implement this.  Instead, it implements
> the
> > > > "new" behavior where the client always gets at least one message.
> > > >
> > > > The new behavior seems to be more desirable, since clients do not
> "get
> > > > stuck" on messages that are too big.  I propose that we edit the wiki
> > to
> > > > reflect the implemented behavior by deleting the references to
> special
> > > > behavior when max_bytes is MAX_INT.
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-81: Max in-flight fetches

2017-01-17 Thread radai
i have (hopefully?) addressed Rajini's concern of muting all connections
ahead of time on the KIP-72 PR.
as for avoiding the pool for small allocations i think thats a great idea.
I also think you could implement it as a composite pool :-)
(composite redirects all requests under size X to the NONE pool and above X
to some "real" pool)

On Wed, Jan 11, 2017 at 8:05 AM, Mickael Maison <mickael.mai...@gmail.com>
wrote:

> Ok thanks for the clarification.
> I agree too, I don't want a new config parameter. From the numbers we
> gathered (see Edoardo's comment above), it shouldn't be too hard to
> pick a meaningful value
>
> On Wed, Jan 11, 2017 at 3:58 PM, Rajini Sivaram <rajinisiva...@gmail.com>
> wrote:
> > Mickael,
> >
> > I had based the comment on KIP-72 description where brokers were muting
> all
> > client channels once memory pool was empty. Having reviewed the PR
> today, I
> > think it may be fine to delay muting and allocate small buffers outside
> of
> > the pool. I would still not want to have a config parameter to decide
> what
> > "small" means, a well chosen hard limit would suffice.
> >
> > On Wed, Jan 11, 2017 at 3:05 PM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> Rajini,
> >>
> >> Why do you think we don't want to do the same for brokers ?
> >> It feels like brokers would be affected the same way and could end up
> >> delaying group/hearbeat requests.
> >>
> >> Also given queued.max.requests it seems unlikely that small requests
> >> (<<1Kb) being allocated outside of the memory pool would cause OOM
> >> exceptions
> >>
> >>
> >> On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram <rsiva...@pivotal.io>
> >> wrote:
> >> > Edo,
> >> >
> >> > I wouldn't introduce a new config entry, especially since you don't
> need
> >> it
> >> > after KAFKA-4137. As a temporary measure that would work for
> consumers.
> >> But
> >> > you probably don't want to do the same for brokers - will be worth
> >> checking
> >> > with Radai since the implementation will be based on KIP-72. To do
> this
> >> > only for consumers, you will need some conditions in the common
> network
> >> > code while allocating and releasing buffers. A bit messy, but doable.
> >> >
> >> >
> >> >
> >> > On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar <eco...@uk.ibm.com>
> >> wrote:
> >> >
> >> >> Thanks Rajini,
> >> >> Before Kafka-4137, we could avoid coordinator starvation without
> making
> >> a
> >> >> special case for a special connection,
> >> >> but rather simply, in applying the buffer.memory check only to
> 'large'
> >> >> responses
> >> >> (e.g.  size > 1k, possibly introducing a new config entry) in
> >> >>
> >> >> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
> >> >>
> >> >> Essentially this would limit reading fetch responses but allow for
> other
> >> >> responses to be processed.
> >> >>
> >> >> This is a sample of sizes for responses I collected :
> >> >>
> >> >> * size=108 APIKEY=3 METADATA
> >> >> *  size=28 APIKEY=10 GROUP_COORDINATOR
> >> >> *  size=193 APIKEY=11 JOIN_GROUP
> >> >> *  size=39 APIKEY=14 SYNC_GROUP
> >> >> *  size=39 APIKEY=9 OFFSET_FETCH
> >> >> *  size=45 APIKEY=2 LIST_OFFSETS
> >> >> *  size=88926 APIKEY=1 FETCH
> >> >> *  size=45 APIKEY=1 FETCH
> >> >> *  size=6 APIKEY=12 HEARTBEAT
> >> >> *  size=45 APIKEY=1 FETCH
> >> >> *  size=45 APIKEY=1 FETCH
> >> >> *  size=45 APIKEY=1 FETCH
> >> >> *  size=6 APIKEY=12 HEARTBEAT
> >> >> *  size=45 APIKEY=1 FETCH
> >> >> *  size=45 APIKEY=1 FETCH
> >> >>
> >> >> What do you think?
> >> >> --
> >> >> Edoardo Comar
> >> >> IBM MessageHub
> >> >> eco...@uk.ibm.com
> >> >> IBM UK Ltd, Hursley Park, SO21 2JN
> >> >>
> >> >> IBM United Kingdom Limited Registered in England and Wales with
> number
> >> >> 741598 Registered office: PO Box 41, North Ha

Re: [VOTE] KIP-107 Add purgeDataBefore() API in AdminClient

2017-01-11 Thread radai
LGTM, +1

On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin  wrote:

> Hi all,
>
> It seems that there is no further concern with the KIP-107. At this point
> we would like to start the voting process. The KIP can be found at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107
> %3A+Add+purgeDataBefore%28%29+API+in+AdminClient.
>
> Thanks,
> Dong
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-01-11 Thread radai
while HTTP-style (string, string) are the most common and most familiar,
there is a very significant impact on msg size, especially given that some
payloads are literally a few integers (think stock quotes) and would be
dwarfed by an http-like header segment.

I think we're ok with not allowing for repeatable headers because:

1. either theyre set by multiple parties that are clashing by mistake - in
this case downstream parsers will fail no matter what we do as the payloads
in those headers are presumably different

2. or theyre set by the same "component"/system/plugin in which case the
owner of the code can read-modify-write the same single header.

On Tue, Jan 10, 2017 at 10:38 PM, Ewen Cheslack-Postava 
wrote:

> not necessarily advocating for supporting this, just suggesting that we
> think about it. As you add these features and get closer to mapping to
> other systems, people will inevitably try to map them. Headers are an area
> where, if we're going to add them, it's worth considering compatibility as
> someone will inevitably come and complain that system X does Y with headers
> and we should also support Y because any decent system that provides
> headers will do so.
>


out of the box security?

2017-01-09 Thread radai
in light of things like this -
https://www.bleepingcomputer.com/news/security/mongodb-apocalypse-is-here-as-ransom-attacks-hit-10-000-servers/

and given that plenty of people/orgs have public facing kafka installations
that are wide open - https://www.shodan.io/search?query=kafka (yes, i
realize those arent brokers, but you could scan for those too).

has anyone given any though to making brokers more secure by default?

maybe something like making the default password be some function of the ZK
url? (which should be common to all brokers) or something 


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2017-01-06 Thread radai
JIRA up - https://issues.apache.org/jira/browse/KAFKA-4602
PR up - https://github.com/apache/kafka/pull/2330
KIP wiki has been updated.



On Fri, Jan 6, 2017 at 8:16 AM, radai <radai.rosenbl...@gmail.com> wrote:

> Will do (sorry for the delay).
> and thank you.
>
> On Fri, Jan 6, 2017 at 7:56 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
>> Radai, you have more than enough votes to declare the vote successful.
>> Maybe it's time to do so. :) Also, once you have done that, it would be
>> good to move this KIP to the adopted table in the wiki.
>>
>> Thanks!
>>
>> Ismael
>>
>> On Fri, Jan 6, 2017 at 2:10 AM, Jun Rao <j...@confluent.io> wrote:
>>
>> > Hi, Radai,
>> >
>> > The new metrics look good. +1 on the KIP.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Fri, Dec 16, 2016 at 4:46 PM, radai <radai.rosenbl...@gmail.com>
>> wrote:
>> >
>> > > I've added the 3 new metrics/sensors i've implemented to the KIP.
>> > >
>> > > at this point I would need to re-validate the functionality (which i
>> > expect
>> > > to do early january).
>> > >
>> > > code reviews welcome ;-)
>> > >
>> > > On Mon, Nov 28, 2016 at 10:37 AM, radai <radai.rosenbl...@gmail.com>
>> > > wrote:
>> > >
>> > > > will do (only added a single one so far, the rest TBD)
>> > > >
>> > > > On Mon, Nov 28, 2016 at 10:04 AM, Jun Rao <j...@confluent.io> wrote:
>> > > >
>> > > >> Hi, Radai,
>> > > >>
>> > > >> Could you add a high level description of the newly added metrics
>> to
>> > the
>> > > >> KIP wiki?
>> > > >>
>> > > >> Thanks,
>> > > >>
>> > > >> Jun
>> > > >>
>> > > >> On Wed, Nov 23, 2016 at 3:45 PM, radai <radai.rosenbl...@gmail.com
>> >
>> > > >> wrote:
>> > > >>
>> > > >> > Hi Jun,
>> > > >> >
>> > > >> > I've added the sensor you requested (or at least I think I did
>> )
>> > > >> >
>> > > >> > On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > >> >
>> > > >> > > KafkaRequestHandlerPool
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > >
>> >
>>
>
>


[GitHub] kafka pull request #2330: KAFKA-4602 - KIP-72 - Allow putting a bound on mem...

2017-01-06 Thread radai-rosenblatt
GitHub user radai-rosenblatt opened a pull request:

https://github.com/apache/kafka/pull/2330

KAFKA-4602 - KIP-72 - Allow putting a bound on memory consumed by Incoming 
requests

this is the initial implementation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radai-rosenblatt/kafka 
broker-memory-pool-with-muting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2330


commit 5697bd5a766b61bc9fe5e20c9a605249dd9b284d
Author: radai-rosenblatt <radai.rosenbl...@gmail.com>
Date:   2016-09-27T16:51:30Z

KAFKA-4602 - introduce MemoryPool interface, use it to control total 
outstanding memory dedicated to broker requests

Signed-off-by: radai-rosenblatt <radai.rosenbl...@gmail.com>




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Different key with the same digest in log compaction

2017-01-06 Thread radai
i just noticed my link didnt parse correctly. the formula for probability
of collision is explained by googling "birthday paradox".
if anything, we could further optimize this code to use a trivial hash map
for cases where sizeOf(hash) > sizeOf(key)

On Fri, Jan 6, 2017 at 10:00 AM, Colin McCabe <cmcc...@apache.org> wrote:

> That's a fair point.  The calls to Arrays.equals are comparing just the
> hashes, not the keys.
>
> Colin
>
> On Tue, Jan 3, 2017, at 17:15, radai wrote:
> > looking at the code (SkimpyOffsetMap.get/put) they both start with
> > hashInto(key, hash1) and then ignore key from that point on - so we're
> > not
> > using the key unless im missing something?
> >
> > as for the probability of collision - it depends on the hash algo and the
> > number of keys. if you configure it to use something like sha-512 the
> > probability is truly negligible.
> >
> > for example, the probability of collision on a topic with 4 billion
> > entries
> > using MD5 is ~ 10^-20 (math -
> > http://www.wolframalpha.com/input/?i=n+%3D+2
> > ^32,+d+%3D+2^128,+1+-+%28%28d-1%29%2Fd%29+^+%28n%28n-1%29%2F2%29)
> >
> > On Tue, Jan 3, 2017 at 4:37 PM, Colin McCabe <cmcc...@apache.org> wrote:
> >
> > > Can you be a little bit clearer on why you think that different keys
> > > with the same digest value will be treated as the same key?
> > > SkimpyOffsetMap#get and SkimpyOffsetMap#put compare the key, not just
> > > the hash digest of the key.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Dec 21, 2016, at 23:27, Renkai Ge wrote:
> > > > Hi,all:
> > > >  I am just learning the kafka codebase, as what I saw in
> > > > https://github.com/apache/kafka/blob/6ed3e6b1cb8a73b1f5f78926ccb247
> > > a8953a554c/core/src/main/scala/kafka/log/OffsetMap.scala#L43-L43
> > > >
> > > > if different log keys have the same digest value, they will be
> treated as
> > > > the same key in log compaction.Though the risk of such things
> happens is
> > > > very small, I still want it to be avoided.If what I thought is wrong
> > > > please
> > > > let me know, and I hope to know the thoughts of who created or
> > > > is maintaining the code.
> > >
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2017-01-06 Thread radai
Will do (sorry for the delay).
and thank you.

On Fri, Jan 6, 2017 at 7:56 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Radai, you have more than enough votes to declare the vote successful.
> Maybe it's time to do so. :) Also, once you have done that, it would be
> good to move this KIP to the adopted table in the wiki.
>
> Thanks!
>
> Ismael
>
> On Fri, Jan 6, 2017 at 2:10 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Radai,
> >
> > The new metrics look good. +1 on the KIP.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Dec 16, 2016 at 4:46 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > I've added the 3 new metrics/sensors i've implemented to the KIP.
> > >
> > > at this point I would need to re-validate the functionality (which i
> > expect
> > > to do early january).
> > >
> > > code reviews welcome ;-)
> > >
> > > On Mon, Nov 28, 2016 at 10:37 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > >
> > > > will do (only added a single one so far, the rest TBD)
> > > >
> > > > On Mon, Nov 28, 2016 at 10:04 AM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > >> Hi, Radai,
> > > >>
> > > >> Could you add a high level description of the newly added metrics to
> > the
> > > >> KIP wiki?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >> On Wed, Nov 23, 2016 at 3:45 PM, radai <radai.rosenbl...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Jun,
> > > >> >
> > > >> > I've added the sensor you requested (or at least I think I did
> )
> > > >> >
> > > >> > On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > >> >
> > > >> > > KafkaRequestHandlerPool
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-109: Old Consumer Deprecation

2017-01-05 Thread radai
I cant speak for anyone else, but a rolling upgrade is definitely how we
(LinkedIn) will do the migration.

On Thu, Jan 5, 2017 at 4:28 PM, Gwen Shapira  wrote:

> it sounds good to have
> it, but that's probably not how people will end up migrati
>


[jira] [Created] (KAFKA-4602) KIP-72 Allow putting a bound on memory consumed by Incoming requests

2017-01-05 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-4602:
---

 Summary: KIP-72 Allow putting a bound on memory consumed by 
Incoming requests
 Key: KAFKA-4602
 URL: https://issues.apache.org/jira/browse/KAFKA-4602
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: radai rosenblatt
Assignee: radai rosenblatt


this issue tracks the implementation of KIP-72, as outlined here - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-109: Old Consumer Deprecation

2017-01-05 Thread radai
im all for (working towards) getting rid of old code, but there's still no
solid migration path - you'll be "stranding" users on deprecated, no longer
maintained code with no "safe" way out that does not involve downtime
(specifically old and new consumers cannot correctly divide up partitions
between themselves if both operate within the same group on the same topic).

On Thu, Jan 5, 2017 at 3:10 PM, Gwen Shapira  wrote:

> Very strong support from me too :)
>
> On Thu, Jan 5, 2017 at 12:09 PM, Vahid S Hashemian
>  wrote:
> > Hi all,
> >
> > There was some discussion recently on deprecating the old consumer (
> > https://www.mail-archive.com/dev@kafka.apache.org/msg59084.html).
> > Ismael suggested to cover the discussion and voting of major deprecations
> > like this under a KIP.
> >
> > So I started KIP-109 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 109%3A+Old+Consumer+Deprecation
> > ) and look forward to your feedback and comments.
> >
> > We'd like to implement this deprecation in the upcoming 0.10.2.0 release.
> >
> > Thanks.
> > --Vahid
> >
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

2017-01-04 Thread radai
one more example of complicated config - mirror maker.

we definitely cant trust each and every topic owner to configure their
topics not to purge before they've been mirrored.
which would mean there's a per-topic config (set by the owner) and a
"global" config (where mirror makers are specified) and they need to be
"merged".
for those topics that _are_ mirrored.
which is a changing set of topics thats stored in an external system
outside of kafka.
if a topic is taken out of the mirror set the MM offset would be "frozen"
at that point and prevent clean-up for all eternity, unless its cleaned-up
itself.

...

complexity :-)

On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com> wrote:

> in summary - i'm not opposed to the idea of a per-topic clean up config
> that tracks some set of consumer groups' offsets (which would probably work
> for 80% of use cases), but i definitely see a need to expose a simple API
> for the more advanced/obscure/custom use cases (the other 20%).
>
> On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com> wrote:
>
>> a major motivation for this KIP is cost savings.
>>
>> lots of internal systems at LI use kafka as an intermediate pipe, and set
>> the topic retention period to a "safe enough" amount of time to be able to
>> recover from crashes/downtime and catch up to "now". this results in a few
>> days' worth of retention typically.
>>
>> however, under normal operating conditions the consumers are mostly
>> caught-up and so early clean-up enables a big cost savings in storage.
>>
>> as for my points:
>>
>> 1. when discussing implementation options for automatic clean-up we
>> realized that cleaning up by keeping track of offsets stored in kafka
>> requires some per-topic config - you need to specify which groups to track.
>> this becomes a problem because:
>> 1.1 - relatively complicated code, to be written in the broker.
>> 1.2 - configuration needs to be maintained up to date by topic
>> "owners" - of which we have thousands. failure to do so would decrease the
>> cost benefit.
>> 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
>> workflow where they will reset their offsets to an earlier value than the
>> one stored. this means that a stored offset of X does not always mean you
>> can clean up to X-1. think of it as video encoding -some apps have "key
>> frames" they may seek back to which are before their current offset.
>> 1.4 - there are multiple possible strategies - you could clean up
>> aggressively, retain some "time distance" from latest, some "offset
>> distance", etc. this we think would have made it very hard to agree on a
>> single "correct" implementation that everyone would be happy with. it would
>> be better to include the raw functionality in the API and leave the
>> "brains" to an external monitoring system where people could custom-taylor
>> their logic
>>
>> 2. ad-hoc consumer groups: its common practice for devs to spin up
>> console consumers and connect to a topic as a debug aid. SREs may also do
>> this. there are also various other eco-system applications that may
>> consumer from topics (unknown to topic owners as those are infra monitoring
>> tools). obviously such consumer-groups' offsets should be ignored for
>> purposes of clean-up, but coming up with a bullet-proof way to do this is
>> non-trivial and again ties with implementation complexity and inflexibility
>> of a "one size fits all" solution in 1.4 above.
>>
>> 3. forceful clean-up: we have workflows that use kafka to move gigantic
>> blobs from offline hadoop processing flows into systems. the data being
>> "loaded" into such an online system can be several GBs in side and take a
>> long time to consume (they are sliced into many small msgs). sometimes the
>> sender wants to abort and start a new blob before the current load process
>> has completed - meaning the consumer's offsets are not yet caught up.
>>
>> 4. offsets outside of kafka: yes, you could force applications to store
>> their offsets twice, but thats inefficient. its better to expose a raw,
>> simple API and let such applications manage their own clean-up logic (this
>> again ties into 1.4 and no "one size fits all" solution)
>>
>> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
>>> e...@confluent.io>
&g

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

2017-01-04 Thread radai
in summary - i'm not opposed to the idea of a per-topic clean up config
that tracks some set of consumer groups' offsets (which would probably work
for 80% of use cases), but i definitely see a need to expose a simple API
for the more advanced/obscure/custom use cases (the other 20%).

On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com> wrote:

> a major motivation for this KIP is cost savings.
>
> lots of internal systems at LI use kafka as an intermediate pipe, and set
> the topic retention period to a "safe enough" amount of time to be able to
> recover from crashes/downtime and catch up to "now". this results in a few
> days' worth of retention typically.
>
> however, under normal operating conditions the consumers are mostly
> caught-up and so early clean-up enables a big cost savings in storage.
>
> as for my points:
>
> 1. when discussing implementation options for automatic clean-up we
> realized that cleaning up by keeping track of offsets stored in kafka
> requires some per-topic config - you need to specify which groups to track.
> this becomes a problem because:
> 1.1 - relatively complicated code, to be written in the broker.
> 1.2 - configuration needs to be maintained up to date by topic
> "owners" - of which we have thousands. failure to do so would decrease the
> cost benefit.
> 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
> workflow where they will reset their offsets to an earlier value than the
> one stored. this means that a stored offset of X does not always mean you
> can clean up to X-1. think of it as video encoding -some apps have "key
> frames" they may seek back to which are before their current offset.
> 1.4 - there are multiple possible strategies - you could clean up
> aggressively, retain some "time distance" from latest, some "offset
> distance", etc. this we think would have made it very hard to agree on a
> single "correct" implementation that everyone would be happy with. it would
> be better to include the raw functionality in the API and leave the
> "brains" to an external monitoring system where people could custom-taylor
> their logic
>
> 2. ad-hoc consumer groups: its common practice for devs to spin up console
> consumers and connect to a topic as a debug aid. SREs may also do this.
> there are also various other eco-system applications that may consumer from
> topics (unknown to topic owners as those are infra monitoring tools).
> obviously such consumer-groups' offsets should be ignored for purposes of
> clean-up, but coming up with a bullet-proof way to do this is non-trivial
> and again ties with implementation complexity and inflexibility of a "one
> size fits all" solution in 1.4 above.
>
> 3. forceful clean-up: we have workflows that use kafka to move gigantic
> blobs from offline hadoop processing flows into systems. the data being
> "loaded" into such an online system can be several GBs in side and take a
> long time to consume (they are sliced into many small msgs). sometimes the
> sender wants to abort and start a new blob before the current load process
> has completed - meaning the consumer's offsets are not yet caught up.
>
> 4. offsets outside of kafka: yes, you could force applications to store
> their offsets twice, but thats inefficient. its better to expose a raw,
> simple API and let such applications manage their own clean-up logic (this
> again ties into 1.4 and no "one size fits all" solution)
>
> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> wrote:
>
>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <e...@confluent.io
>> >
>> wrote:
>>
>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Ewen,
>> > >
>> > > Thanks for the review. As Radai explained, it would be complex in
>> terms
>> > of
>> > > user configuration if we were to use committed offset to decide data
>> > > deletion. We need a way to specify which groups need to consume data
>> of
>> > > this partition. The broker will also need to consume the entire
>> offsets
>> > > topic in that approach which has some overhead. I don't think it is
>> that
>> > > hard to implement. But it will likely take more time to discuss that
>> > > approach due to the new config and the server side overhead.
>> > >
>> > > We choose to put this API in AdminClient because the API is more like
>> an
>> > > administrative operation (such as

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

2017-01-04 Thread radai
a major motivation for this KIP is cost savings.

lots of internal systems at LI use kafka as an intermediate pipe, and set
the topic retention period to a "safe enough" amount of time to be able to
recover from crashes/downtime and catch up to "now". this results in a few
days' worth of retention typically.

however, under normal operating conditions the consumers are mostly
caught-up and so early clean-up enables a big cost savings in storage.

as for my points:

1. when discussing implementation options for automatic clean-up we
realized that cleaning up by keeping track of offsets stored in kafka
requires some per-topic config - you need to specify which groups to track.
this becomes a problem because:
1.1 - relatively complicated code, to be written in the broker.
1.2 - configuration needs to be maintained up to date by topic "owners"
- of which we have thousands. failure to do so would decrease the cost
benefit.
1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
workflow where they will reset their offsets to an earlier value than the
one stored. this means that a stored offset of X does not always mean you
can clean up to X-1. think of it as video encoding -some apps have "key
frames" they may seek back to which are before their current offset.
1.4 - there are multiple possible strategies - you could clean up
aggressively, retain some "time distance" from latest, some "offset
distance", etc. this we think would have made it very hard to agree on a
single "correct" implementation that everyone would be happy with. it would
be better to include the raw functionality in the API and leave the
"brains" to an external monitoring system where people could custom-taylor
their logic

2. ad-hoc consumer groups: its common practice for devs to spin up console
consumers and connect to a topic as a debug aid. SREs may also do this.
there are also various other eco-system applications that may consumer from
topics (unknown to topic owners as those are infra monitoring tools).
obviously such consumer-groups' offsets should be ignored for purposes of
clean-up, but coming up with a bullet-proof way to do this is non-trivial
and again ties with implementation complexity and inflexibility of a "one
size fits all" solution in 1.4 above.

3. forceful clean-up: we have workflows that use kafka to move gigantic
blobs from offline hadoop processing flows into systems. the data being
"loaded" into such an online system can be several GBs in side and take a
long time to consume (they are sliced into many small msgs). sometimes the
sender wants to abort and start a new blob before the current load process
has completed - meaning the consumer's offsets are not yet caught up.

4. offsets outside of kafka: yes, you could force applications to store
their offsets twice, but thats inefficient. its better to expose a raw,
simple API and let such applications manage their own clean-up logic (this
again ties into 1.4 and no "one size fits all" solution)

On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> wrote:

> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Ewen,
> > >
> > > Thanks for the review. As Radai explained, it would be complex in terms
> > of
> > > user configuration if we were to use committed offset to decide data
> > > deletion. We need a way to specify which groups need to consume data of
> > > this partition. The broker will also need to consume the entire offsets
> > > topic in that approach which has some overhead. I don't think it is
> that
> > > hard to implement. But it will likely take more time to discuss that
> > > approach due to the new config and the server side overhead.
> > >
> > > We choose to put this API in AdminClient because the API is more like
> an
> > > administrative operation (such as listGroups, deleteTopics) than a
> > consumer
> > > operation. It is not necessarily called by consumer only. For example,
> we
> > > can implement the "delete data before committed offset" approach by
> > running
> > > an external service which calls purgeDataBefore() API based on
> committed
> > > offset of consumer groups.
> > >
> > > I am not aware that AdminClient is not a public API. Suppose it is not
> > > public now, I assume we plan to make it public in the future as part of
> > > KIP-4. Are we not making it public because its interface is not stable?
> > If
> > > so, can we just tag this new API as not stable in the code?

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-03 Thread radai
@jun - good proposal. i was willing to concede that read-uncommitted was
impossible under my proposal but if LSO/NSO is introduced is becomes
possible.


On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao  wrote:

> Just to follow up on Radai's idea of pushing the buffering logic to the
> broker. It may be possible to do this efficiently if we assume aborted
> transactions are rare. The following is a draft proposal. For each
> partition, the broker maintains the last stable offset (LSO) as described
> in the document, and only exposes messages up to this point if the reader
> is in the read-committed mode. When a new stable offset (NSO) is
> determined, if there is no aborted message in this window, the broker
> simply advances the LSO to the NSO. If there is at least one aborted
> message, the broker first replaces the current log segment with new log
> segments excluding the aborted messages and then advances the LSO. To make
> the replacement efficient, we can replace the current log segment with 3
> new segments: (1) a new "shadow" log segment that simply references the
> portion of the current log segment from the beginning to the LSO, (2) a log
> segment created by copying only committed messages between the LSO and the
> NSO, (3) a new "shadow" log segment that references the portion of the
> current log segment from the NSO (open ended). Note that only (2) involves
> real data copying. If aborted transactions are rare, this overhead will be
> insignificant. Assuming that applications typically don't abort
> transactions, transactions will only be aborted by transaction coordinators
> during hard failure of the producers, which should be rare.
>
> This way, the consumer library's logic will be simplified. We can still
> expose uncommitted messages to readers in the read-uncommitted mode and
> therefore leave the door open for speculative reader in the future.
>
> Thanks,
>
> Jun
>
>
> On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta 
> wrote:
>
> > Hi Joel,
> >
> > The alternatives are embedded in the 'discussion' sections which are
> spread
> > throughout the google doc.
> >
> > Admittedly, we have not covered high level alternatives like those which
> > have been brought up in this thread. In particular, having a separate log
> > for transactional mesages and also having multiple producers participate
> in
> > a single transaction.
> >
> > This is an omission which we will correct.
> >
> > Thanks,
> > Apurva
> >
> > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy 
> wrote:
> >
> > > >
> > > >
> > > > @Joel,
> > > >
> > > > I read over your wiki, and apart from the introduction of the notion
> of
> > > > journal partitions --whose pros and cons are already being
> discussed--
> > > you
> > > > also introduce the notion of a 'producer group' which enables
> multiple
> > > > producers to participate in a single transaction. This is completely
> > > > opposite of the model in the KIP where a transaction is defined by a
> > > > producer id, and hence there is a 1-1 mapping between producers and
> > > > transactions. Further, each producer can have exactly one in-flight
> > > > transaction at a time in the KIP.
> > > >
> > >
> > > Hi Apurva - yes I did notice those differences among other things :)
> > BTW, I
> > > haven't yet gone through the google-doc carefully but on a skim it does
> > not
> > > seem to contain any rejected alternatives as the wiki states.
> > >
> >
>


Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

2017-01-03 Thread radai
also 4. some apps may do their own offset bookkeeping

On Tue, Jan 3, 2017 at 5:29 PM, radai <radai.rosenbl...@gmail.com> wrote:

> the issue with tracking committed offsets is whos offsets do you track?
>
> 1. some topics have multiple groups
> 2. some "groups" are really one-offs like developers spinning up console
> consumer "just to see if there's data"
> 3. there are use cases where you want to deliberately "wipe" data EVEN IF
> its still being consumed
>
> #1 is a configuration mess, since there are multiple possible strategies.
> #2 is problematic without a definition of "liveliness" or special handling
> for console consumer? and #3 is flat out impossible with committed-offset
> tracking
>
> On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
>> Dong,
>>
>> Looks like that's an internal link,
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
>> 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>> is the right one.
>>
>> I have a question about one of the rejected alternatives:
>>
>> > Using committed offset instead of an extra API to trigger data purge
>> operation.
>>
>> The KIP says this would be more complicated to implement. Why is that? I
>> think brokers would have to consume the entire offsets topic, but the data
>> stored in memory doesn't seem to change and applying this when updated
>> offsets are seen seems basically the same. It might also be possible to
>> make it work even with multiple consumer groups if that was desired
>> (although that'd require tracking more data in memory) as a generalization
>> without requiring coordination between the consumer groups. Given the
>> motivation, I'm assuming this was considered unnecessary since this
>> specifically targets intermediate stream processing topics.
>>
>> Another question is why expose this via AdminClient (which isn't public
>> API
>> afaik)? Why not, for example, expose it on the Consumer, which is
>> presumably where you'd want access to it since the functionality depends
>> on
>> the consumer actually having consumed the data?
>>
>> -Ewen
>>
>> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hi all,
>> >
>> > We created KIP-107 to propose addition of purgeDataBefore() API in
>> > AdminClient.
>> >
>> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
>> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
>> design+proposal.
>> > We
>> > would love to hear your comments and suggestions.
>> >
>> > Thanks,
>> > Dong
>> >
>>
>
>


Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

2017-01-03 Thread radai
the issue with tracking committed offsets is whos offsets do you track?

1. some topics have multiple groups
2. some "groups" are really one-offs like developers spinning up console
consumer "just to see if there's data"
3. there are use cases where you want to deliberately "wipe" data EVEN IF
its still being consumed

#1 is a configuration mess, since there are multiple possible strategies.
#2 is problematic without a definition of "liveliness" or special handling
for console consumer? and #3 is flat out impossible with committed-offset
tracking

On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava 
wrote:

> Dong,
>
> Looks like that's an internal link,
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> is the right one.
>
> I have a question about one of the rejected alternatives:
>
> > Using committed offset instead of an extra API to trigger data purge
> operation.
>
> The KIP says this would be more complicated to implement. Why is that? I
> think brokers would have to consume the entire offsets topic, but the data
> stored in memory doesn't seem to change and applying this when updated
> offsets are seen seems basically the same. It might also be possible to
> make it work even with multiple consumer groups if that was desired
> (although that'd require tracking more data in memory) as a generalization
> without requiring coordination between the consumer groups. Given the
> motivation, I'm assuming this was considered unnecessary since this
> specifically targets intermediate stream processing topics.
>
> Another question is why expose this via AdminClient (which isn't public API
> afaik)? Why not, for example, expose it on the Consumer, which is
> presumably where you'd want access to it since the functionality depends on
> the consumer actually having consumed the data?
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin  wrote:
>
> > Hi all,
> >
> > We created KIP-107 to propose addition of purgeDataBefore() API in
> > AdminClient.
> >
> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+
> API+design+proposal.
> > We
> > would love to hear your comments and suggestions.
> >
> > Thanks,
> > Dong
> >
>


Re: Different key with the same digest in log compaction

2017-01-03 Thread radai
looking at the code (SkimpyOffsetMap.get/put) they both start with
hashInto(key, hash1) and then ignore key from that point on - so we're not
using the key unless im missing something?

as for the probability of collision - it depends on the hash algo and the
number of keys. if you configure it to use something like sha-512 the
probability is truly negligible.

for example, the probability of collision on a topic with 4 billion entries
using MD5 is ~ 10^-20 (math - http://www.wolframalpha.com/input/?i=n+%3D+2
^32,+d+%3D+2^128,+1+-+%28%28d-1%29%2Fd%29+^+%28n%28n-1%29%2F2%29)

On Tue, Jan 3, 2017 at 4:37 PM, Colin McCabe  wrote:

> Can you be a little bit clearer on why you think that different keys
> with the same digest value will be treated as the same key?
> SkimpyOffsetMap#get and SkimpyOffsetMap#put compare the key, not just
> the hash digest of the key.
>
> best,
> Colin
>
>
> On Wed, Dec 21, 2016, at 23:27, Renkai Ge wrote:
> > Hi,all:
> >  I am just learning the kafka codebase, as what I saw in
> > https://github.com/apache/kafka/blob/6ed3e6b1cb8a73b1f5f78926ccb247
> a8953a554c/core/src/main/scala/kafka/log/OffsetMap.scala#L43-L43
> >
> > if different log keys have the same digest value, they will be treated as
> > the same key in log compaction.Though the risk of such things happens is
> > very small, I still want it to be avoided.If what I thought is wrong
> > please
> > let me know, and I hope to know the thoughts of who created or
> > is maintaining the code.
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2017-01-03 Thread radai
I've just re-validated the functionality works - broker throttles under
stress instead of OOMs.

at this point my branch (
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting)
is "code complete" and somewhat tested and im waiting on the voting process
to come to a conclusion before moving forward.

On Fri, Dec 16, 2016 at 4:46 PM, radai <radai.rosenbl...@gmail.com> wrote:

> I've added the 3 new metrics/sensors i've implemented to the KIP.
>
> at this point I would need to re-validate the functionality (which i
> expect to do early january).
>
> code reviews welcome ;-)
>
> On Mon, Nov 28, 2016 at 10:37 AM, radai <radai.rosenbl...@gmail.com>
> wrote:
>
>> will do (only added a single one so far, the rest TBD)
>>
>> On Mon, Nov 28, 2016 at 10:04 AM, Jun Rao <j...@confluent.io> wrote:
>>
>>> Hi, Radai,
>>>
>>> Could you add a high level description of the newly added metrics to the
>>> KIP wiki?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Wed, Nov 23, 2016 at 3:45 PM, radai <radai.rosenbl...@gmail.com>
>>> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > I've added the sensor you requested (or at least I think I did )
>>> >
>>> > On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao <j...@confluent.io> wrote:
>>> >
>>> > > KafkaRequestHandlerPool
>>> >
>>>
>>
>>
>


Re: [DISCUSS] KIP-105: Addition of Record Level for Sensors

2016-12-31 Thread radai
link leads to 104. i think this is the correct one -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-105%3A+Addition+of+Record+Level+for+Sensors
?

On Fri, Dec 30, 2016 at 8:31 PM, Aarti Gupta  wrote:

> Hi all,
>
> I would like to start the discussion on KIP-105: Addition of Record Level
> for Sensors
> *https://cwiki.apache.org/confluence/pages/viewpage.action?
>  >*
> *pageId=67636483*
>
> Looking forward to your feedback.
>
> Thanks,
> Aarti and Eno
>


Re: custom offsets in ProduceRequest

2016-12-29 Thread radai
or even better - if topic creation is done dynamically by the replicator,
setting the initial offsets for partitions could be made part of topic
creation ? even less API changes this way

On Thu, Dec 29, 2016 at 10:49 PM, radai <radai.rosenbl...@gmail.com> wrote:

> ah, I didnt realize we are limiting the discussion to master --> slave.
>
> but - if we're talking about master-slave replication, and under the
> conditions i outlined earlier (src and dest match in #partitions, no
> foreign writes to dest) it "just works", seems to me the only thing youre
> really missing is not an explicit desired offset param on each and every
> request, but just the ability to "reset" the starting offset on the dest
> cluster at topic creation.
>
> let me try and run through a more detailed scenario:
>
> 1. suppose i set up the original cluster (src). no remote cluster yet.
> lets say over some period of time i produce 1 million msgs to topic X on
> this src cluster.
> 2. company grows, 2nd site is opened, dest cluster is created, topic X is
> created on (brand new) dest cluster.
> 3. offsets are manually set on every partition of X on the dest cluster to
> match either the oldest retained or current offset of the matching
> partition of X in src. in pseudo code:
>
>  for (partI in numPartitions) {
> partIOffset
> if (replicateAllRetainedHistory) {
>partIOffset = src.getOldestRetained(partI)
> } else {
>partIOffset = src.getCurrent(partI) //will not copy over history
> }
> dest.resetStartingOffset(partI, partIOffset)   < new mgmt API
>  }
>
> 4. now you are free to start replicating. under master --> slave
> assumptions offsets will match from this point forward
>
> seems to me something like this could be made part of the replicator
> component (mirror maker, or whatever else you want to use) - if topic X
> does not exist in destination, create it, reset initial offsets to match
> source, start replication
>
> On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada <
> anepor...@yandex-team.ru> wrote:
>
>>
>> > On 29 Dec 2016, at 20:43, radai <radai.rosenbl...@gmail.com> wrote:
>> >
>> > so, if i follow your suggested logic correctly, there would be some
>> sort of
>> > :
>> >
>> > produce(partition, msg, requestedOffset)
>> >
>>
>> > which would fail if requestedOffset is already taken (by another
>> previous
>> > such explicit call or by another regular call that just happened to get
>> > assigned that offset by the partition leader on the target cluster).
>> >
>>
>> Yes. More formally, my proposal is to extend ProduceRequest by adding
>> MessageSetStartOffset:
>>
>> ProduceRequest => RequiredAcks Timeout [TopicName [Partition
>> MessageSetStartOffset MessageSetSize MessageSet]]
>>   RequiredAcks => int16
>>   Timeout => int32
>>   Partition => int32
>>   MessageSetSize => int32
>>   MessageSetStartOffset => int64
>>
>> If MessageSetStartOffset is -1, ProduceRequest should work exactly as
>> before - i.e. assign next available offset to given MessageSet.
>>
>>
>> > how would you meaningfully handle this failure?
>> >
>> > suppose this happens to some cross-cluster replicator (like mirror
>> maker).
>> > there is no use in retrying. the options would be:
>> >
>> > 1. get the next available offset - which would violate what youre
>> trying to
>> > achieve
>> > 2. skip msgs - so replication is incomplete, any offset "already taken"
>> on
>> > the destination is not replicated from source
>> > 3. stop replication for this partition completely - because starting
>> from
>> > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
>> > entire partition.
>> >
>> > none of these options look good to me.
>> >
>> >
>>
>> Since we are discussing master-slave replication, the only client writing
>> to slave cluster is the replicator itself.
>> In this case ProduceRequest failure is some kind of replication logic
>> error - for example when two replication instances are somehow launched for
>> single partition.
>> The best option here is just to stop replication process.
>>
>> So the answer to your question is (3), but this scenario should never
>> happen.
>>
>>
>> >
>> > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
>> > anepor...@yandex-team.ru> wrote

Re: custom offsets in ProduceRequest

2016-12-29 Thread radai
ah, I didnt realize we are limiting the discussion to master --> slave.

but - if we're talking about master-slave replication, and under the
conditions i outlined earlier (src and dest match in #partitions, no
foreign writes to dest) it "just works", seems to me the only thing youre
really missing is not an explicit desired offset param on each and every
request, but just the ability to "reset" the starting offset on the dest
cluster at topic creation.

let me try and run through a more detailed scenario:

1. suppose i set up the original cluster (src). no remote cluster yet. lets
say over some period of time i produce 1 million msgs to topic X on this
src cluster.
2. company grows, 2nd site is opened, dest cluster is created, topic X is
created on (brand new) dest cluster.
3. offsets are manually set on every partition of X on the dest cluster to
match either the oldest retained or current offset of the matching
partition of X in src. in pseudo code:

 for (partI in numPartitions) {
partIOffset
if (replicateAllRetainedHistory) {
   partIOffset = src.getOldestRetained(partI)
} else {
   partIOffset = src.getCurrent(partI) //will not copy over history
}
dest.resetStartingOffset(partI, partIOffset)   < new mgmt API
 }

4. now you are free to start replicating. under master --> slave
assumptions offsets will match from this point forward

seems to me something like this could be made part of the replicator
component (mirror maker, or whatever else you want to use) - if topic X
does not exist in destination, create it, reset initial offsets to match
source, start replication

On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

>
> > On 29 Dec 2016, at 20:43, radai <radai.rosenbl...@gmail.com> wrote:
> >
> > so, if i follow your suggested logic correctly, there would be some sort
> of
> > :
> >
> > produce(partition, msg, requestedOffset)
> >
>
> > which would fail if requestedOffset is already taken (by another previous
> > such explicit call or by another regular call that just happened to get
> > assigned that offset by the partition leader on the target cluster).
> >
>
> Yes. More formally, my proposal is to extend ProduceRequest by adding
> MessageSetStartOffset:
>
> ProduceRequest => RequiredAcks Timeout [TopicName [Partition
> MessageSetStartOffset MessageSetSize MessageSet]]
>   RequiredAcks => int16
>   Timeout => int32
>   Partition => int32
>   MessageSetSize => int32
>   MessageSetStartOffset => int64
>
> If MessageSetStartOffset is -1, ProduceRequest should work exactly as
> before - i.e. assign next available offset to given MessageSet.
>
>
> > how would you meaningfully handle this failure?
> >
> > suppose this happens to some cross-cluster replicator (like mirror
> maker).
> > there is no use in retrying. the options would be:
> >
> > 1. get the next available offset - which would violate what youre trying
> to
> > achieve
> > 2. skip msgs - so replication is incomplete, any offset "already taken"
> on
> > the destination is not replicated from source
> > 3. stop replication for this partition completely - because starting from
> > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
> > entire partition.
> >
> > none of these options look good to me.
> >
> >
>
> Since we are discussing master-slave replication, the only client writing
> to slave cluster is the replicator itself.
> In this case ProduceRequest failure is some kind of replication logic
> error - for example when two replication instances are somehow launched for
> single partition.
> The best option here is just to stop replication process.
>
> So the answer to your question is (3), but this scenario should never
> happen.
>
>
> >
> > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
> > anepor...@yandex-team.ru> wrote:
> >
> >> Hi!
> >>
> >>> On 27 Dec 2016, at 19:35, radai <radai.rosenbl...@gmail.com> wrote:
> >>>
> >>> IIUC if you replicate from a single source cluster to a single target
> >>> cluster, the topic has the same number of partitions on both, and no
> one
> >>> writes directly to the target cluster (so master --> slave) the offsets
> >>> would be preserved.
> >>>
> >>
> >> Yes, exactly. When you
> >> 1) create topic with the same number of partitions on both master and
> >> slave clusters
> >> 2) write only to master
> >> 3) replicate partition to partition from

Re: custom offsets in ProduceRequest

2016-12-29 Thread radai
so, if i follow your suggested logic correctly, there would be some sort of
:

produce(partition, msg, requestedOffset)

which would fail if requestedOffset is already taken (by another previous
such explicit call or by another regular call that just happened to get
assigned that offset by the partition leader on the target cluster).

how would you meaningfully handle this failure?

suppose this happens to some cross-cluster replicator (like mirror maker).
there is no use in retrying. the options would be:

1. get the next available offset - which would violate what youre trying to
achieve
2. skip msgs - so replication is incomplete, any offset "already taken" on
the destination is not replicated from source
3. stop replication for this partition completely - because starting from
now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
entire partition.

none of these options look good to me.



On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi!
>
> > On 27 Dec 2016, at 19:35, radai <radai.rosenbl...@gmail.com> wrote:
> >
> > IIUC if you replicate from a single source cluster to a single target
> > cluster, the topic has the same number of partitions on both, and no one
> > writes directly to the target cluster (so master --> slave) the offsets
> > would be preserved.
> >
>
> Yes, exactly. When you
> 1) create topic with the same number of partitions on both master and
> slave clusters
> 2) write only to master
> 3) replicate partition to partition from master to slave
> - in this case the offsets will be preserved.
>
> However, you usually already have cluster that works and want to replicate
> some topics to another one.
> IMHO, in this scenario there should be a way to make message offsets equal
> on both clusters.
>
> > but in the general case - how would you handle the case where multiple
> > producers "claim" the same offset ?
>
> The same way as Kafka handles concurrent produce requests for the same
> partition - produce requests for partition are serialized.
> If the next produce request “overlaps” with previous one, it fails.
>
> >
> >
> > On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
> > anepor...@yandex-team.ru> wrote:
> >
> >> Hi all!
> >>
> >> Suppose you have two Kafka clusters and want to replicate topics from
> >> primary cluster to secondary one.
> >> It would be very convenient for readers if the message offsets for
> >> replicated topics would be the same as for primary topics.
> >>
> >> As far as I know, currently there is no way to achieve this.
> >> I wonder is it possible/reasonable to add message offset to
> ProduceRequest?
> >>
> >>
> >> —
> >> Andrey Neporada
> >>
> >>
> >>
> >>
>
>


Re: custom offsets in ProduceRequest

2016-12-27 Thread radai
IIUC if you replicate from a single source cluster to a single target
cluster, the topic has the same number of partitions on both, and no one
writes directly to the target cluster (so master --> slave) the offsets
would be preserved.

but in the general case - how would you handle the case where multiple
producers "claim" the same offset ?


On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi all!
>
> Suppose you have two Kafka clusters and want to replicate topics from
> primary cluster to secondary one.
> It would be very convenient for readers if the message offsets for
> replicated topics would be the same as for primary topics.
>
> As far as I know, currently there is no way to achieve this.
> I wonder is it possible/reasonable to add message offset to ProduceRequest?
>
>
> —
> Andrey Neporada
>
>
>
>


Re: Different key with the same digest in log compaction

2016-12-22 Thread radai
i think the code assumes that with a "good enough" hash function (and maybe
few enough keys) the chance of such a collision is acceptably small to
justify the savings of not keeping the keys in memory.


On Wed, Dec 21, 2016 at 11:50 PM, Renkai  wrote:

> Hi, all:
>
>  I am just learning the Kafka codebase, as what I saw in
> https://github.com/apache/kafka/blob/6ed3e6b1cb8a73b1f5f78926ccb247
> a8953a554c/core/src/main/scala/kafka/log/OffsetMap.scala#L43-L43
>
> if different log keys have the same digest value, they will be treated as
> the same key in log compaction. Though the risk of such things happens is
> very small, I still want it to be avoided. If what I thought is wrong
> please let me know, and I hope to know the thoughts of who created or is
> maintaining the code.
>
>
>
>


Re: [VOTE] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-12-21 Thread radai
+1

On Wed, Dec 21, 2016 at 9:51 AM, Dong Lin  wrote:

> +1 (non-binding)
>
> On Thu, Dec 15, 2016 at 5:32 PM, Becket Qin  wrote:
>
> > Hi,
> >
> > I want to start a voting thread on KIP-92 which proposes to add per
> > partition lag metrics to KafkaConsumer. The KIP wiki page is below:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 92+-+Add+per+partition+lag+metrics+to+KafkaConsumer
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-20 Thread radai
when the leader decides to commit a TX (of X msgs, known at this point), it
writes an "intent to append X msgs" msg (control?) followed by the X msgs
(at this point it is the leader and therefor point of sync, so this can be
done with no "foreign" msgs in between).
if there's a crash/change of leadership the new leader can just roll back
(remove what partial contents it had) if it sees the "intent" msg but dosnt
see X msgs belonging to the TX after it. the watermark does not advance
into the middle of a TX - so nothing is visible to any consumer until the
whole thing is committed and replicated (or crashes and rolled back). which
means i dont think TX storage needs replication, and atomicity to consumers
is retained.

I cant argue with the latency argument, but:

1. if TXs can be done in-mem maybe TX per-msg isnt that expensive?
2. I think a logical clock approach (with broker-side dedup based on the
clock) could provide the same exactly once semantics without requiring
transactions at all?

however, I concede that as you describe it (long running TXs where commits
are actually "checkpoint"s spaced to optimize overhead vs RPO/RTO) you
would require read uncommitted to minimize latency.

On Tue, Dec 20, 2016 at 1:24 PM, Apurva Mehta  wrote:

> durably at the moment we enter the pre-commit phase. If we
> don't have durable persistence of these messages, we can't have idempotent
> and atomic copying into the main  log, and your proposal to date does not
> show otherwise.
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-20 Thread radai
obviously anything committed would need to be replicated to all followers -
just like current msgs.

what im trying to say is that in-flight data (written as part of an ongoing
TX and not committed yet) does not necessarily need to be replicated, or
even written out to disk. taken to the extreme it means i can buffer in
memory on the leader alone and incur no extra writes at all.

if you dont want to just buffer in-memory on the leader (or are forced to
spool to disk because of size) you could still avoid a double write by
messing around with segment files (so the TX file becomes part of the
"linked-list" of segment files instead of reading it and appending it's
contents verbatim to the current segment file).

the area when this does inevitably come short is latency and "read
uncommitted" (which are related). the added delay (after cutting all the
corners above) would really be the "time span" of a TX - the amount of time
from the moment the producer started the TX to the time when it was
committed. in my mind this time span is very short. am I failing to
understand the proposed "typical" use case? is the plan to use long-running
transactions and only commit at, say, 5 minute "checkpoints" ?

On Tue, Dec 20, 2016 at 10:00 AM, Jay Kreps <j...@confluent.io> wrote:

> Cool. It sounds like you guys will sync up and come up with a specific
> proposal. I think point (3) does require full replication of the pre-commit
> transaction, but I'm not sure, and I would be very happy to learn
> otherwise. That was actually the blocker on that alternate proposal. From
> my point of view 2x overhead is kind of a deal breaker since it makes
> correctness so expensive you'd have to think very hard before turning it
> on, but if there is a way to do it with less and there aren't too many
> other negative side effects that would be very appealing. I think we can
> also dive a bit into why we are so perf and latency sensitive as it relates
> to the stream processing use cases...I'm not sure how much of that is
> obvious from the proposal.
>
> -Jay
>
>
>
>
>
> On Tue, Dec 20, 2016 at 9:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
>
> > Just got some time to go through most of this thread and KIP - great to
> see
> > this materialize and discussed!!
> > I will add more comments in the coming days on some of the other "tracks"
> > in this thread; but since Radai brought up the double-journaling approach
> > that we had discussed I thought I would move over some content from
> > our internal
> > wiki on double-journalling
> > <https://cwiki.apache.org/confluence/display/KAFKA/
> > Double+journaling+with+local+data+copy>
> > It is thin on details with a few invalid statements because I don't think
> > we dwelt long enough on it - it was cast aside as being too expensive
> from
> > a storage and latency perspective. As the immediately preceding emails
> > state, I tend to agree that those are compelling enough reasons to take a
> > hit in complexity/increased memory usage in the consumer. Anyway, couple
> of
> > us at LinkedIn can spend some time today brainstorming a little more on
> > this today.
> >
> > 1. on write amplification: i dont see x6 the writes, at worst i see x2
> the
> > > writes - once to the "tx log", then read and again to the destination
> > > partition. if you have some != 1 replication factor than both the 1st
> and
> > > the 2nd writes get replicated, but it is still a relative factor of x2.
> > > what am I missing?
> > >
> >
> > I think that's right - it would be six total copies if we are doing RF 3.
> >
> >
> > > 3. why do writes to a TX need the same guarantees as "plain" writes? in
> > > cases where the user can live with a TX rollback on change of
> > > leadership/broker crash the TX log can be unreplicated, and even live
> in
> > > the leader's memory. that would cut down on writes. this is also an
> > > acceptable default in SQL - if your socket connection to a DB dies
> mid-TX
> > > your TX is toast (mysql is even worse)
> > >
> >
> > I may have misunderstood - while the above may be true for transactions
> > in-flight, it definitely needs the same guarantees at the point of commit
> > and the straightforward way to achieve that is to rely on the same
> > guarantees while the transaction is in flight.
> >
> > 4. even if we replicate the TX log, why do we need to re-read it and
> > > re-write it to the underlying partition? if its already written to disk
> > all
> > > I would need is to make that file the current segment of

  1   2   >