Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Ismael, Sorry, The response below was in regards to your comments, got my wires crossed, apologies. Hi Jun, I’m happy with the change, I see Jason updated our KIP, many thanks for this, and thanks for implementing for us ☺ Cheers Mike On 20/03/2017, 13:19, "Michael Pearce" wrote: Hi Jun, Thanks the comments I’ve updated the KIP a little where agreement. My comments: 1) Good point, removed from the interface. See updated KIP 2) I think, Radai’s suggested header(String key) is a cleaner method name, but happy to change if community believe lastHeader is better. I’ll keep Radai’s suggested name for now. 3) Agreed, added IllegalStateException to interface. See updated KIP 4) I think for now intent would be to simply instantiate a new object from the provided implementation that would exist and implement the Header interface e.g. I would expect user to invoke “new HeaderRecord(String key, byte[] value)” for now. 5) Agreed, we should add later if useful. I think as per other arguments of trying to keep the methods for now limited as easier to add new methods later, but cannot take away, as such I think we should avoid adding this, as you note it is still possible with the current suggested api. Cheers Mike On 17/03/2017, 13:47, "isma...@gmail.com on behalf of Ismael Juma" wrote: Jun, the message format you proposed seems reasonable to me. I have a few minor comments with regards to the user facing API: 1. Do we want to expose the `close()` method in the Headers interface? It seems that this method should only be called by the producer after the headers have been passed to the interceptors, so it may make sense to keep it as an internal method in the implementation class. 2. `header(String key)` returns the last header for that key. Maybe we should make it explicit by calling the method `lastHeader(String key)`. 3. I agree with the change to throw an exception if we try to modify the headers when they are in read-only mode. We should specify the exception in the KIP. IllegalStateException, as suggested by Radai, seems reasonable. 4. How do users create a `Header` instance to pass to the `add` method? We could introduce a static `create` method that takes both parameters to the `Header` interface (requires Java 8). 5. There's no method to replace all the headers with a given key so one would have to call `remove` and then `add`. Is the assumption that this is rare? If so, that's probably OK, we can add another method later, if it's useful. Thanks, Ismael On Thu, Mar 16, 2017 at 4:44 PM, Jun Rao wrote: > Hi, Everyone, > > Jason has been working on the new message format related to EOS ( > https://github.com/apache/kafka/pull/2614). He has included the header > changes proposed in the KIP, which reduces the overhead for supporting an > additional message format change if done separately. Since the message > format part of the header proposal seems less controversial and the > consensus is header is needed, does anyone have objections to this? The > following is the new record format with headers. > > * Record => > * Length => Varint > * Attributes => Int8 > * TimestampDelta => Varlong > * OffsetDelta => Varint > * Key => Bytes > * Value => Bytes > * Headers => [HeaderKey HeaderValue] > * HeaderKey => String > * HeaderValue => Bytes > * > * Note that in this schema, the Bytes and String types use a variable > length integer to represent > * the length of the field. The array type used for the headers also > uses a Varint for the number of > * headers. > > Thanks, > > Jun > > > On Tue, Mar 14, 2017 at 10:49 AM, Ismael Juma wrote: > > > Thanks Radai. Great to have a concrete example of the intended usage. > > > > Regarding performance, we would need to benchmark, as you said. But there > > would be a lot of reuse (in essence, we are copying 5 references plus a > new > > object header), so I'd be surprised if that would be the bottleneck > > compared to some of the other allocations that would be happening in that > > path. In any case, I think we can leave this aside for now since people > > also felt that the mutable API would be easier to use. > > > > About ProducerRecord reuse, my understanding is that people do sometimes > > retry a failed request manually due to the fact that a large retry number > > d
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Jun, Thanks the comments I’ve updated the KIP a little where agreement. My comments: 1) Good point, removed from the interface. See updated KIP 2) I think, Radai’s suggested header(String key) is a cleaner method name, but happy to change if community believe lastHeader is better. I’ll keep Radai’s suggested name for now. 3) Agreed, added IllegalStateException to interface. See updated KIP 4) I think for now intent would be to simply instantiate a new object from the provided implementation that would exist and implement the Header interface e.g. I would expect user to invoke “new HeaderRecord(String key, byte[] value)” for now. 5) Agreed, we should add later if useful. I think as per other arguments of trying to keep the methods for now limited as easier to add new methods later, but cannot take away, as such I think we should avoid adding this, as you note it is still possible with the current suggested api. Cheers Mike On 17/03/2017, 13:47, "isma...@gmail.com on behalf of Ismael Juma" wrote: Jun, the message format you proposed seems reasonable to me. I have a few minor comments with regards to the user facing API: 1. Do we want to expose the `close()` method in the Headers interface? It seems that this method should only be called by the producer after the headers have been passed to the interceptors, so it may make sense to keep it as an internal method in the implementation class. 2. `header(String key)` returns the last header for that key. Maybe we should make it explicit by calling the method `lastHeader(String key)`. 3. I agree with the change to throw an exception if we try to modify the headers when they are in read-only mode. We should specify the exception in the KIP. IllegalStateException, as suggested by Radai, seems reasonable. 4. How do users create a `Header` instance to pass to the `add` method? We could introduce a static `create` method that takes both parameters to the `Header` interface (requires Java 8). 5. There's no method to replace all the headers with a given key so one would have to call `remove` and then `add`. Is the assumption that this is rare? If so, that's probably OK, we can add another method later, if it's useful. Thanks, Ismael On Thu, Mar 16, 2017 at 4:44 PM, Jun Rao wrote: > Hi, Everyone, > > Jason has been working on the new message format related to EOS ( > https://github.com/apache/kafka/pull/2614). He has included the header > changes proposed in the KIP, which reduces the overhead for supporting an > additional message format change if done separately. Since the message > format part of the header proposal seems less controversial and the > consensus is header is needed, does anyone have objections to this? The > following is the new record format with headers. > > * Record => > * Length => Varint > * Attributes => Int8 > * TimestampDelta => Varlong > * OffsetDelta => Varint > * Key => Bytes > * Value => Bytes > * Headers => [HeaderKey HeaderValue] > * HeaderKey => String > * HeaderValue => Bytes > * > * Note that in this schema, the Bytes and String types use a variable > length integer to represent > * the length of the field. The array type used for the headers also > uses a Varint for the number of > * headers. > > Thanks, > > Jun > > > On Tue, Mar 14, 2017 at 10:49 AM, Ismael Juma wrote: > > > Thanks Radai. Great to have a concrete example of the intended usage. > > > > Regarding performance, we would need to benchmark, as you said. But there > > would be a lot of reuse (in essence, we are copying 5 references plus a > new > > object header), so I'd be surprised if that would be the bottleneck > > compared to some of the other allocations that would be happening in that > > path. In any case, I think we can leave this aside for now since people > > also felt that the mutable API would be easier to use. > > > > About ProducerRecord reuse, my understanding is that people do sometimes > > retry a failed request manually due to the fact that a large retry number > > doesn't help if a batch is expired in the queue. I believe KIP-91 will > > help: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer > > > > In addition, KIP-98 (Exactly-once) won't achieve its goal if people do > > manual retries. So, it seems like it's OK to require people to create a > new > > ProducerRecord if they really need to do manual retries. But we should > add > > a note to the compatibility section of the KIP. > > > > I have a few minor API suggestions. I'll send a follow-up later today, > > hopefully. > > > > Ismael > > > > On
Re: [DISCUSS] KIP-82 - Add Record Headers
Jun, the message format you proposed seems reasonable to me. I have a few minor comments with regards to the user facing API: 1. Do we want to expose the `close()` method in the Headers interface? It seems that this method should only be called by the producer after the headers have been passed to the interceptors, so it may make sense to keep it as an internal method in the implementation class. 2. `header(String key)` returns the last header for that key. Maybe we should make it explicit by calling the method `lastHeader(String key)`. 3. I agree with the change to throw an exception if we try to modify the headers when they are in read-only mode. We should specify the exception in the KIP. IllegalStateException, as suggested by Radai, seems reasonable. 4. How do users create a `Header` instance to pass to the `add` method? We could introduce a static `create` method that takes both parameters to the `Header` interface (requires Java 8). 5. There's no method to replace all the headers with a given key so one would have to call `remove` and then `add`. Is the assumption that this is rare? If so, that's probably OK, we can add another method later, if it's useful. Thanks, Ismael On Thu, Mar 16, 2017 at 4:44 PM, Jun Rao wrote: > Hi, Everyone, > > Jason has been working on the new message format related to EOS ( > https://github.com/apache/kafka/pull/2614). He has included the header > changes proposed in the KIP, which reduces the overhead for supporting an > additional message format change if done separately. Since the message > format part of the header proposal seems less controversial and the > consensus is header is needed, does anyone have objections to this? The > following is the new record format with headers. > > * Record => > * Length => Varint > * Attributes => Int8 > * TimestampDelta => Varlong > * OffsetDelta => Varint > * Key => Bytes > * Value => Bytes > * Headers => [HeaderKey HeaderValue] > * HeaderKey => String > * HeaderValue => Bytes > * > * Note that in this schema, the Bytes and String types use a variable > length integer to represent > * the length of the field. The array type used for the headers also > uses a Varint for the number of > * headers. > > Thanks, > > Jun > > > On Tue, Mar 14, 2017 at 10:49 AM, Ismael Juma wrote: > > > Thanks Radai. Great to have a concrete example of the intended usage. > > > > Regarding performance, we would need to benchmark, as you said. But there > > would be a lot of reuse (in essence, we are copying 5 references plus a > new > > object header), so I'd be surprised if that would be the bottleneck > > compared to some of the other allocations that would be happening in that > > path. In any case, I think we can leave this aside for now since people > > also felt that the mutable API would be easier to use. > > > > About ProducerRecord reuse, my understanding is that people do sometimes > > retry a failed request manually due to the fact that a large retry number > > doesn't help if a batch is expired in the queue. I believe KIP-91 will > > help: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer > > > > In addition, KIP-98 (Exactly-once) won't achieve its goal if people do > > manual retries. So, it seems like it's OK to require people to create a > new > > ProducerRecord if they really need to do manual retries. But we should > add > > a note to the compatibility section of the KIP. > > > > I have a few minor API suggestions. I'll send a follow-up later today, > > hopefully. > > > > Ismael > > > > On Mon, Mar 13, 2017 at 6:23 PM, radai > wrote: > > > > > the common "stack" we envision at linkedin would consist of (at least) > > the > > > following components that add headers to every outgoing request: > > > > > > 1. auditing/"lineage" - appends a header containing "node" (hostname > > etc), > > > time (UTC time) and destination (cluster/topic). these accumulate as > > > requests get mirrored between clusters > > > 2. serialization - sets a header containing a schema identifier to > allow > > > deserialization > > > 3. client-side encryption - would probably set a header identifying the > > > key/scheme used > > > 4. internal "billing" > > > > > > there are also several other teams at linkedin that would use headers > > > (although its unclear yet if via interceptors or by directly > manipulating > > > requests) > > > > > > if headers are made completely immutable (as the entire request object > > > currently is) we would end up copying (parts of) every msg 4 times. I > > > havent benchmarked but this seems like it would have an impact to me. > > > > > > looking elsewhere rabbitMQ and http components both use mutable request > > > objects (rabbitMW's BasicProperties object, http components' addHeader > > > method). > > > > > > how common is it right now for instances of ProducerRecord to actually > be > > > reused? > > > do people really have things
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi, Everyone, Jason has been working on the new message format related to EOS ( https://github.com/apache/kafka/pull/2614). He has included the header changes proposed in the KIP, which reduces the overhead for supporting an additional message format change if done separately. Since the message format part of the header proposal seems less controversial and the consensus is header is needed, does anyone have objections to this? The following is the new record format with headers. * Record => * Length => Varint * Attributes => Int8 * TimestampDelta => Varlong * OffsetDelta => Varint * Key => Bytes * Value => Bytes * Headers => [HeaderKey HeaderValue] * HeaderKey => String * HeaderValue => Bytes * * Note that in this schema, the Bytes and String types use a variable length integer to represent * the length of the field. The array type used for the headers also uses a Varint for the number of * headers. Thanks, Jun On Tue, Mar 14, 2017 at 10:49 AM, Ismael Juma wrote: > Thanks Radai. Great to have a concrete example of the intended usage. > > Regarding performance, we would need to benchmark, as you said. But there > would be a lot of reuse (in essence, we are copying 5 references plus a new > object header), so I'd be surprised if that would be the bottleneck > compared to some of the other allocations that would be happening in that > path. In any case, I think we can leave this aside for now since people > also felt that the mutable API would be easier to use. > > About ProducerRecord reuse, my understanding is that people do sometimes > retry a failed request manually due to the fact that a large retry number > doesn't help if a batch is expired in the queue. I believe KIP-91 will > help: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 91+Provide+Intuitive+User+Timeouts+in+The+Producer > > In addition, KIP-98 (Exactly-once) won't achieve its goal if people do > manual retries. So, it seems like it's OK to require people to create a new > ProducerRecord if they really need to do manual retries. But we should add > a note to the compatibility section of the KIP. > > I have a few minor API suggestions. I'll send a follow-up later today, > hopefully. > > Ismael > > On Mon, Mar 13, 2017 at 6:23 PM, radai wrote: > > > the common "stack" we envision at linkedin would consist of (at least) > the > > following components that add headers to every outgoing request: > > > > 1. auditing/"lineage" - appends a header containing "node" (hostname > etc), > > time (UTC time) and destination (cluster/topic). these accumulate as > > requests get mirrored between clusters > > 2. serialization - sets a header containing a schema identifier to allow > > deserialization > > 3. client-side encryption - would probably set a header identifying the > > key/scheme used > > 4. internal "billing" > > > > there are also several other teams at linkedin that would use headers > > (although its unclear yet if via interceptors or by directly manipulating > > requests) > > > > if headers are made completely immutable (as the entire request object > > currently is) we would end up copying (parts of) every msg 4 times. I > > havent benchmarked but this seems like it would have an impact to me. > > > > looking elsewhere rabbitMQ and http components both use mutable request > > objects (rabbitMW's BasicProperties object, http components' addHeader > > method). > > > > how common is it right now for instances of ProducerRecord to actually be > > reused? > > do people really have things like publis static final ProducerRecord > > MY_FAVORITE_REQUEST = ... ? > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
Thanks Radai. Great to have a concrete example of the intended usage. Regarding performance, we would need to benchmark, as you said. But there would be a lot of reuse (in essence, we are copying 5 references plus a new object header), so I'd be surprised if that would be the bottleneck compared to some of the other allocations that would be happening in that path. In any case, I think we can leave this aside for now since people also felt that the mutable API would be easier to use. About ProducerRecord reuse, my understanding is that people do sometimes retry a failed request manually due to the fact that a large retry number doesn't help if a batch is expired in the queue. I believe KIP-91 will help: https://cwiki.apache.org/confluence/display/KAFKA/KIP- 91+Provide+Intuitive+User+Timeouts+in+The+Producer In addition, KIP-98 (Exactly-once) won't achieve its goal if people do manual retries. So, it seems like it's OK to require people to create a new ProducerRecord if they really need to do manual retries. But we should add a note to the compatibility section of the KIP. I have a few minor API suggestions. I'll send a follow-up later today, hopefully. Ismael On Mon, Mar 13, 2017 at 6:23 PM, radai wrote: > the common "stack" we envision at linkedin would consist of (at least) the > following components that add headers to every outgoing request: > > 1. auditing/"lineage" - appends a header containing "node" (hostname etc), > time (UTC time) and destination (cluster/topic). these accumulate as > requests get mirrored between clusters > 2. serialization - sets a header containing a schema identifier to allow > deserialization > 3. client-side encryption - would probably set a header identifying the > key/scheme used > 4. internal "billing" > > there are also several other teams at linkedin that would use headers > (although its unclear yet if via interceptors or by directly manipulating > requests) > > if headers are made completely immutable (as the entire request object > currently is) we would end up copying (parts of) every msg 4 times. I > havent benchmarked but this seems like it would have an impact to me. > > looking elsewhere rabbitMQ and http components both use mutable request > objects (rabbitMW's BasicProperties object, http components' addHeader > method). > > how common is it right now for instances of ProducerRecord to actually be > reused? > do people really have things like publis static final ProducerRecord > MY_FAVORITE_REQUEST = ... ? >
Re: [DISCUSS] KIP-82 - Add Record Headers
the common "stack" we envision at linkedin would consist of (at least) the following components that add headers to every outgoing request: 1. auditing/"lineage" - appends a header containing "node" (hostname etc), time (UTC time) and destination (cluster/topic). these accumulate as requests get mirrored between clusters 2. serialization - sets a header containing a schema identifier to allow deserialization 3. client-side encryption - would probably set a header identifying the key/scheme used 4. internal "billing" there are also several other teams at linkedin that would use headers (although its unclear yet if via interceptors or by directly manipulating requests) if headers are made completely immutable (as the entire request object currently is) we would end up copying (parts of) every msg 4 times. I havent benchmarked but this seems like it would have an impact to me. looking elsewhere rabbitMQ and http components both use mutable request objects (rabbitMW's BasicProperties object, http components' addHeader method). how common is it right now for instances of ProducerRecord to actually be reused? do people really have things like publis static final ProducerRecord MY_FAVORITE_REQUEST = ... ?
Re: [DISCUSS] KIP-82 - Add Record Headers
> >> > >> > >> >> >>> > >> A. content-type > >> > >> >> >>> > >> It seems that in general, content-type should be set at > >> the > >> > >> topic > >> > >> >> >>> level. > >> > >> >> >>> > >> Not sure if mixing messages with different content > types > >> > >> should be > >> > >> >> >>> > >> encouraged. > >> > >> >> >>> > >> > >> > >> >> >>> > >> B. schema id > >> > >> >> >>> > >> Since the value is mostly useless without schema id, it > >> > seems > >> > >> that > >> > >> >> >>> > storing > >> > >> >> >>> > >> the schema id together with serialized bytes in the > value > >> is > >> > >> >> better? > >> > >> >> >>> > >> > >> > >> >> >>> > >> C. per message encryption > >> > >> >> >>> > >> One drawback of this approach is that this > significantly > >> > reduce > >> > >> >> the > >> > >> >> >>> > >> effectiveness of compression, which happens on a set of > >> > >> serialized > >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire > >> > encryption > >> > >> and > >> > >> >> >>> rely > >> > >> >> >>> > on > >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. > >> > >> >> >>> > >> > >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > >> > >> >> >>> > >> This is actually interesting. Today, to avoid > introducing > >> > >> cycles > >> > >> >> when > >> > >> >> >>> > doing > >> > >> >> >>> > >> mirroring across data centers, one would either have to > >> set > >> > up > >> > >> two > >> > >> >> >>> Kafka > >> > >> >> >>> > >> clusters (a local and an aggregate) per data center or > >> > rename > >> > >> >> topics. > >> > >> >> >>> > >> Neither is ideal. With headers, the producer could tag > >> each > >> > >> >> message > >> > >> >> >>> with > >> > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker > could > >> > then > >> > >> >> avoid > >> > >> >> >>> > >> mirroring messages to a cluster if they are tagged with > >> the > >> > >> same > >> > >> >> >>> cluster > >> > >> >> >>> > >> id. > >> > >> >> >>> > >> > >> > >> >> >>> > >> However, an alternative approach is to introduce sth > like > >> > >> >> >>> hierarchical > >> > >> >> >>> > >> topic and store messages from different clusters in > >> > different > >> > >> >> >>> partitions > >> > >> >> >>> > >> under the same topic. This approach avoids filtering > out > >> > >> unneeded > >> > >> >> >>> data > >> > >> >> >>> > and > >> > >> >> >>> > >> makes offset preserving easier to support. It may make > >> > >> compaction > >> > >> >> >>> > trickier > >> > >> >> >>> > >> though since the same key may show up in different > >> > partitions. > >> > >> >> >>> > >> > >> > >> >> >>> > >> E. record-level lineage > >> > >> >> >>> > >> For example
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Ismael, Yes, it makes sense to do benchmark. My concern was based on the observation in KAFKA-3994 where we saw GC problem when creating new lists in the purgatory. Thanks, Jiangjie (Becket) Qin On Fri, Mar 10, 2017 at 8:54 AM, Ismael Juma wrote: > Hi Becket, > > Sorry for the delay and thanks for your comments. Comments inline. > > On Wed, Mar 1, 2017 at 8:59 PM, Becket Qin wrote: > > > > The difference is that the user chooses the value type. They are free > to > > > choose a mutable or immutable type. A generic interceptor cannot mutate > > the > > > value because it doesn't know the type (and it could be immutable). One > > > could write an interceptor that checked the type of the value at > runtime > > > and did things based on that. But again, the user who creates the > record > > is > > > in control. > > > > > But there is no generic interceptor, right? The interceptor always takes > > specific K, V type. > > > > No, generic interceptors exist, are very useful and we use them at > Confluent. > > If we want to let the users control the mutability, users can always call > > headers.close() before calling producer.send() and that will force the > > interceptor to create new ProducerRecord object. > > > > If we expect users to call `close()`, then this is an important point to > document because interceptors would have to catch an exception or check if > the record has already been closed. I'm not sure if we should allow or > encourage that. > > > > If we do not allow the users to add headers on existing ProducerRecord > > objects, each interceptor who wants to add headers will have to create a > > new ProducerRecord. So we will have to create NUM_INTERCEPTOR times of > > ProducerRecord, if a producer is sending 100K messages per second, it > would > > be a lot of new objects, right? > > > > Note we are allocating one Header instance per header. If you are concerned > about the number of ProducerRecord instances, then it seems like you should > also be concerned about the number of Header instances, which is likely to > be a multiple of the former. In addition, if we use a Map to hold the > headers, we'll also need to allocate the map entry instances. This is not > to say that we should be unnecessarily wasteful, but it's worth > understanding what we're trying to achieve to make sure that we are not > solving the wrong bottleneck. > > It may make sense to do some benchmarking during the implementation phase > to verify that the solution achieves whatever performance goal we're aiming > for. > > Thanks, > Ismael >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Becket, Sorry for the delay and thanks for your comments. Comments inline. On Wed, Mar 1, 2017 at 8:59 PM, Becket Qin wrote: > > The difference is that the user chooses the value type. They are free to > > choose a mutable or immutable type. A generic interceptor cannot mutate > the > > value because it doesn't know the type (and it could be immutable). One > > could write an interceptor that checked the type of the value at runtime > > and did things based on that. But again, the user who creates the record > is > > in control. > > > But there is no generic interceptor, right? The interceptor always takes > specific K, V type. > No, generic interceptors exist, are very useful and we use them at Confluent. If we want to let the users control the mutability, users can always call > headers.close() before calling producer.send() and that will force the > interceptor to create new ProducerRecord object. > If we expect users to call `close()`, then this is an important point to document because interceptors would have to catch an exception or check if the record has already been closed. I'm not sure if we should allow or encourage that. > If we do not allow the users to add headers on existing ProducerRecord > objects, each interceptor who wants to add headers will have to create a > new ProducerRecord. So we will have to create NUM_INTERCEPTOR times of > ProducerRecord, if a producer is sending 100K messages per second, it would > be a lot of new objects, right? > Note we are allocating one Header instance per header. If you are concerned about the number of ProducerRecord instances, then it seems like you should also be concerned about the number of Header instances, which is likely to be a multiple of the former. In addition, if we use a Map to hold the headers, we'll also need to allocate the map entry instances. This is not to say that we should be unnecessarily wasteful, but it's worth understanding what we're trying to achieve to make sure that we are not solving the wrong bottleneck. It may make sense to do some benchmarking during the implementation phase to verify that the solution achieves whatever performance goal we're aiming for. Thanks, Ismael
Re: [DISCUSS] KIP-82 - Add Record Headers
just to clarify - ListIterator is a nice API, and doesnt constrain the implementation a lot more than Iterator (especially if we implement previous() very inefficiently :-) ), but changing Iterable headers(String key) into ListIterator headers(String key) would lose us the ability to easily write what i think is the most common case - a for each loop: for (Header stop : headers("lineage")) { //examine stop } On Tue, Mar 7, 2017 at 12:31 PM, radai wrote: > ing, as you call it, would probably be implemente
Re: [DISCUSS] KIP-82 - Add Record Headers
where do you see insert-in-the-middle/replace being commonly used? lineage tracing, as you call it, would probably be implemented by way of: 1. every "stop" along the way appending itself (at the end) 2. some replication technologies, instead of just doing #1, may clear out everything when they replicate (starting from a clean slate) On Mon, Mar 6, 2017 at 11:00 AM, Colin McCabe wrote: > As others have mentioned, it seems clear that we want to preserve the > ordering of message headers, so that we can implement things like > lineage tracing. (For example, each stage could add a "lineage:" > header.) I also think that we want the ability to add and remove > headers as needed. It would be really unfortunate if the only way to > remove a message header or add a header at a certain position was the > duplicate the whole message and re-create everything. > > So how about implementing ListIterator? > https://docs.oracle.com/javase/7/docs/api/java/util/ListIterator.html > It supports adding and removing things at arbitrary positions. For > people who want to use it as a simple Iterator, it is one (and you can > use all the fancy syntax such as Java's foreach with it). For people > who want to add and remove things at arbitrary locations, they can. And > it doesn't expose the implementation, so that can be changed later. We > can materialize things in memory lazily if we want to, and so forth. I > think using the standard interface is better than rolling our own > nonstandard collection or iterator interfaces. > > regards, > Colin > > > On Wed, Mar 1, 2017, at 12:59, Becket Qin wrote: > > Hi Ismael, > > > > Thanks for the reply. Please see the comments inline. > > > > On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma wrote: > > > > > Hi Becket, > > > > > > Thanks for sharing your thoughts. More inline. > > > > > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin > wrote: > > > > > > > As you can imagine if the ProducerRecord has a value as a List and > the > > > > Interceptor.onSend() can actually add an element to the List. If the > > > > producer.send() is called on the same ProducerRecord again, the value > > > list > > > > would have one more element than the previously sent ProducerRecord > even > > > > though the ProducerRecord itself is not mutable, right? Same thing > can > > > > apply to any modifiable class type. > > > > > > > > > > The difference is that the user chooses the value type. They are free > to > > > choose a mutable or immutable type. A generic interceptor cannot > mutate the > > > value because it doesn't know the type (and it could be immutable). One > > > could write an interceptor that checked the type of the value at > runtime > > > and did things based on that. But again, the user who creates the > record is > > > in control. > > > > > But there is no generic interceptor, right? The interceptor always takes > > specific K, V type. > > > > > > > From this standpoint allowing headers to be mutable doesn't really > weaken > > > > the mutability we already have. Admittedly a mutable header is kind > of > > > > guiding user towards to change the headers in the existing object > instead > > > > of creating a new one. > > > > > > > > > Yes, with headers, we are providing a model for the user (the user > doesn't > > > get to choose it like with keys and values) and for the interceptors. > So, I > > > think it's not the same. > > > > > > > > > > > But I think reusing an object while it is possible > > > > to be modified by user code is a risk that users themselves are > willing > > > to > > > > take. And we do not really protect against that. > > > > > > > > > If users want to take that risk, it's fine. But we give them the > option to > > > protect themselves. With mutable headers, there is no option. > > > > If we want to let the users control the mutability, users can always call > > headers.close() before calling producer.send() and that will force the > > interceptor to create new ProducerRecord object. > > > > Because the headers are mostly useful for interceptors, unless the users > > do > > not want the interceptors to change their records, it seems reasonable to > > say that by default modification of headers are allowed for the > > interceptors. > > > > > > > > > > > > But there still seems > > > > value to allow the users to not pay the overhead of creating tons of > > > > objects if they do not reuse an object to send it twice, which is > > > probably > > > > a more common case. > > > > > > > > > > I think the assumption that there would be tons of objects created is > > > incorrect (I suggested a solution that would only involve one > additional > > > reference in the `Header` instance). The usability of the immutable API > > > seemed to be more of an issue. > > > > > If we do not allow the users to add headers on existing ProducerRecord > > objects, each interceptor who wants to add headers will have to create a > > new ProducerRecord. So we will have to create NUM_INTERCEPTOR
Re: [DISCUSS] KIP-82 - Add Record Headers
As others have mentioned, it seems clear that we want to preserve the ordering of message headers, so that we can implement things like lineage tracing. (For example, each stage could add a "lineage:" header.) I also think that we want the ability to add and remove headers as needed. It would be really unfortunate if the only way to remove a message header or add a header at a certain position was the duplicate the whole message and re-create everything. So how about implementing ListIterator? https://docs.oracle.com/javase/7/docs/api/java/util/ListIterator.html It supports adding and removing things at arbitrary positions. For people who want to use it as a simple Iterator, it is one (and you can use all the fancy syntax such as Java's foreach with it). For people who want to add and remove things at arbitrary locations, they can. And it doesn't expose the implementation, so that can be changed later. We can materialize things in memory lazily if we want to, and so forth. I think using the standard interface is better than rolling our own nonstandard collection or iterator interfaces. regards, Colin On Wed, Mar 1, 2017, at 12:59, Becket Qin wrote: > Hi Ismael, > > Thanks for the reply. Please see the comments inline. > > On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma wrote: > > > Hi Becket, > > > > Thanks for sharing your thoughts. More inline. > > > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin wrote: > > > > > As you can imagine if the ProducerRecord has a value as a List and the > > > Interceptor.onSend() can actually add an element to the List. If the > > > producer.send() is called on the same ProducerRecord again, the value > > list > > > would have one more element than the previously sent ProducerRecord even > > > though the ProducerRecord itself is not mutable, right? Same thing can > > > apply to any modifiable class type. > > > > > > > The difference is that the user chooses the value type. They are free to > > choose a mutable or immutable type. A generic interceptor cannot mutate the > > value because it doesn't know the type (and it could be immutable). One > > could write an interceptor that checked the type of the value at runtime > > and did things based on that. But again, the user who creates the record is > > in control. > > > But there is no generic interceptor, right? The interceptor always takes > specific K, V type. > > > > From this standpoint allowing headers to be mutable doesn't really weaken > > > the mutability we already have. Admittedly a mutable header is kind of > > > guiding user towards to change the headers in the existing object instead > > > of creating a new one. > > > > > > Yes, with headers, we are providing a model for the user (the user doesn't > > get to choose it like with keys and values) and for the interceptors. So, I > > think it's not the same. > > > > > > > But I think reusing an object while it is possible > > > to be modified by user code is a risk that users themselves are willing > > to > > > take. And we do not really protect against that. > > > > > > If users want to take that risk, it's fine. But we give them the option to > > protect themselves. With mutable headers, there is no option. > > If we want to let the users control the mutability, users can always call > headers.close() before calling producer.send() and that will force the > interceptor to create new ProducerRecord object. > > Because the headers are mostly useful for interceptors, unless the users > do > not want the interceptors to change their records, it seems reasonable to > say that by default modification of headers are allowed for the > interceptors. > > > > > > > > But there still seems > > > value to allow the users to not pay the overhead of creating tons of > > > objects if they do not reuse an object to send it twice, which is > > probably > > > a more common case. > > > > > > > I think the assumption that there would be tons of objects created is > > incorrect (I suggested a solution that would only involve one additional > > reference in the `Header` instance). The usability of the immutable API > > seemed to be more of an issue. > > > If we do not allow the users to add headers on existing ProducerRecord > objects, each interceptor who wants to add headers will have to create a > new ProducerRecord. So we will have to create NUM_INTERCEPTOR times of > ProducerRecord, if a producer is sending 100K messages per second, it > would > be a lot of new objects, right? > > > > > In any case, if we do add the `close()` method, then we need to add a note > > to the compatibility section stating that once a producer record is sent, > > it cannot be sent again as this would cause interceptors that add headers > > to fail. > > > Agreed, clear documentation is important. > > > > > Ismael > >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Ismael, Thanks for the reply. Please see the comments inline. On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma wrote: > Hi Becket, > > Thanks for sharing your thoughts. More inline. > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin wrote: > > > As you can imagine if the ProducerRecord has a value as a List and the > > Interceptor.onSend() can actually add an element to the List. If the > > producer.send() is called on the same ProducerRecord again, the value > list > > would have one more element than the previously sent ProducerRecord even > > though the ProducerRecord itself is not mutable, right? Same thing can > > apply to any modifiable class type. > > > > The difference is that the user chooses the value type. They are free to > choose a mutable or immutable type. A generic interceptor cannot mutate the > value because it doesn't know the type (and it could be immutable). One > could write an interceptor that checked the type of the value at runtime > and did things based on that. But again, the user who creates the record is > in control. > But there is no generic interceptor, right? The interceptor always takes specific K, V type. > From this standpoint allowing headers to be mutable doesn't really weaken > > the mutability we already have. Admittedly a mutable header is kind of > > guiding user towards to change the headers in the existing object instead > > of creating a new one. > > > Yes, with headers, we are providing a model for the user (the user doesn't > get to choose it like with keys and values) and for the interceptors. So, I > think it's not the same. > > > But I think reusing an object while it is possible > > to be modified by user code is a risk that users themselves are willing > to > > take. And we do not really protect against that. > > > If users want to take that risk, it's fine. But we give them the option to > protect themselves. With mutable headers, there is no option. If we want to let the users control the mutability, users can always call headers.close() before calling producer.send() and that will force the interceptor to create new ProducerRecord object. Because the headers are mostly useful for interceptors, unless the users do not want the interceptors to change their records, it seems reasonable to say that by default modification of headers are allowed for the interceptors. > > > > But there still seems > > value to allow the users to not pay the overhead of creating tons of > > objects if they do not reuse an object to send it twice, which is > probably > > a more common case. > > > > I think the assumption that there would be tons of objects created is > incorrect (I suggested a solution that would only involve one additional > reference in the `Header` instance). The usability of the immutable API > seemed to be more of an issue. > If we do not allow the users to add headers on existing ProducerRecord objects, each interceptor who wants to add headers will have to create a new ProducerRecord. So we will have to create NUM_INTERCEPTOR times of ProducerRecord, if a producer is sending 100K messages per second, it would be a lot of new objects, right? > > In any case, if we do add the `close()` method, then we need to add a note > to the compatibility section stating that once a producer record is sent, > it cannot be sent again as this would cause interceptors that add headers > to fail. > Agreed, clear documentation is important. > > Ismael >
Re: [DISCUSS] KIP-82 - Add Record Headers
@michael: i used void because im used to java beans. thinking about it, i dont see much use for returning false from adding a header: if the headers are in read-only you should probably thrown an IllegalStateException because lets face it, 99% of users dont check return values. returning "this" is probably more useful because it would allow chaining: Headers.add().add().remove() On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma wrote: > Hi Becket, > > Thanks for sharing your thoughts. More inline. > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin wrote: > > > As you can imagine if the ProducerRecord has a value as a List and the > > Interceptor.onSend() can actually add an element to the List. If the > > producer.send() is called on the same ProducerRecord again, the value > list > > would have one more element than the previously sent ProducerRecord even > > though the ProducerRecord itself is not mutable, right? Same thing can > > apply to any modifiable class type. > > > > The difference is that the user chooses the value type. They are free to > choose a mutable or immutable type. A generic interceptor cannot mutate the > value because it doesn't know the type (and it could be immutable). One > could write an interceptor that checked the type of the value at runtime > and did things based on that. But again, the user who creates the record is > in control. > > From this standpoint allowing headers to be mutable doesn't really weaken > > the mutability we already have. Admittedly a mutable header is kind of > > guiding user towards to change the headers in the existing object instead > > of creating a new one. > > > Yes, with headers, we are providing a model for the user (the user doesn't > get to choose it like with keys and values) and for the interceptors. So, I > think it's not the same. > > > > But I think reusing an object while it is possible > > to be modified by user code is a risk that users themselves are willing > to > > take. And we do not really protect against that. > > > If users want to take that risk, it's fine. But we give them the option to > protect themselves. With mutable headers, there is no option. > > > > But there still seems > > value to allow the users to not pay the overhead of creating tons of > > objects if they do not reuse an object to send it twice, which is > probably > > a more common case. > > > > I think the assumption that there would be tons of objects created is > incorrect (I suggested a solution that would only involve one additional > reference in the `Header` instance). The usability of the immutable API > seemed to be more of an issue. > > In any case, if we do add the `close()` method, then we need to add a note > to the compatibility section stating that once a producer record is sent, > it cannot be sent again as this would cause interceptors that add headers > to fail. > > Ismael >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Becket, Thanks for sharing your thoughts. More inline. On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin wrote: > As you can imagine if the ProducerRecord has a value as a List and the > Interceptor.onSend() can actually add an element to the List. If the > producer.send() is called on the same ProducerRecord again, the value list > would have one more element than the previously sent ProducerRecord even > though the ProducerRecord itself is not mutable, right? Same thing can > apply to any modifiable class type. > The difference is that the user chooses the value type. They are free to choose a mutable or immutable type. A generic interceptor cannot mutate the value because it doesn't know the type (and it could be immutable). One could write an interceptor that checked the type of the value at runtime and did things based on that. But again, the user who creates the record is in control. >From this standpoint allowing headers to be mutable doesn't really weaken > the mutability we already have. Admittedly a mutable header is kind of > guiding user towards to change the headers in the existing object instead > of creating a new one. Yes, with headers, we are providing a model for the user (the user doesn't get to choose it like with keys and values) and for the interceptors. So, I think it's not the same. > But I think reusing an object while it is possible > to be modified by user code is a risk that users themselves are willing to > take. And we do not really protect against that. If users want to take that risk, it's fine. But we give them the option to protect themselves. With mutable headers, there is no option. > But there still seems > value to allow the users to not pay the overhead of creating tons of > objects if they do not reuse an object to send it twice, which is probably > a more common case. > I think the assumption that there would be tons of objects created is incorrect (I suggested a solution that would only involve one additional reference in the `Header` instance). The usability of the immutable API seemed to be more of an issue. In any case, if we do add the `close()` method, then we need to add a note to the compatibility section stating that once a producer record is sent, it cannot be sent again as this would cause interceptors that add headers to fail. Ismael
Re: [DISCUSS] KIP-82 - Add Record Headers
Sent to early: Hi Radai: RE: Header header(String key) - returns JUST ONE (the very last) value given a key Iterable headers(String key) - returns ALL under a key Iterable headers() - returns all, period. maybe allow null as key to prev method instead? void add(Header header) - appends a header (key inside). void remove(String key) - removes ALL HEADERS under a key. I don't think this one is needed: "Iterable headers() - returns all, period. maybe allow null as key to prev method instead" The class Headers implements Iterable anyhow. On another note of the response for the add, remove. I note you put void. Any particular reason? for add, i would normally either expect a boolean response if it succeeded or not (classical case) or the instance object returned like the "with" pattern. This avoids in the case of when its in read only, needing to throw exception, simply return false (in classical case). Like wise on remove normally i would expect a boolean or previous object returned, so again operation didn't succeed either i get a false, or a null object. any objections if i make the response's for these to booleans to denote if the operation succeeded? Cheers Mike From: Michael Pearce Sent: Wednesday, March 1, 2017 5:55 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Hi Radai: RE: Header header(String key) - returns JUST ONE (the very last) value given a key Iterable headers(String key) - returns ALL under a key void add(Header header) - appends a header (key inside). void remove(String key) - removes ALL HEADERS under a key. I don't think this one is needed: From: Becket Qin Sent: Wednesday, March 1, 2017 2:54 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Hi Ismael, Yes, there is a difference between Batch and Headers. I was just trying to see if that would work. Good point about sending the same ProducerRecord twice, but in fact in that case any reuse of objects would cause problem. As you can imagine if the ProducerRecord has a value as a List and the Interceptor.onSend() can actually add an element to the List. If the producer.send() is called on the same ProducerRecord again, the value list would have one more element than the previously sent ProducerRecord even though the ProducerRecord itself is not mutable, right? Same thing can apply to any modifiable class type. >From this standpoint allowing headers to be mutable doesn't really weaken the mutability we already have. Admittedly a mutable header is kind of guiding user towards to change the headers in the existing object instead of creating a new one. But I think reusing an object while it is possible to be modified by user code is a risk that users themselves are willing to take. And we do not really protect against that. But there still seems value to allow the users to not pay the overhead of creating tons of objects if they do not reuse an object to send it twice, which is probably a more common case. Thanks, Jiangjie (Becket) Qin On Tue, Feb 28, 2017 at 12:43 PM, radai wrote: > I will settle for any API really, but just wanted to point out that as it > stands right now the API targets the most "advanced" (hence obscure and > rare) use cases, at the expense of the simple and common ones. i'd suggest > (as the minimal set): > > Header header(String key) - returns JUST ONE (the very last) value given a > key > Iterable headers(String key) - returns ALL under a key > Iterable headers() - returns all, period. maybe allow null as key > to prev method instead? > void add(Header header) - appends a header (key inside). > void remove(String key) - removes ALL HEADERS under a key. > > this way naive get/set semantics map to header(key)/add(Header) cleanly and > simply while preserving the ability to handle more advanced use cases. > we can always add more convenience methods (like those dealing with lists - > addAll etc) but i think the 5 (potentially 4) above are sufficient for > basically everything. > > On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma wrote: > > > Hi Becket, > > > > Comments inline. > > > > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin > wrote: > > > > > > 1. Regarding the mutability. > > > > > > I think it would be a big convenience to have headers mutable during > > > certain stage in the message life cycle for the use cases you > mentioned. > > I > > > agree there is a material benefit especially given that we may have to > > > modify the headers for each message. > > > > > > That said, I also think it is fair to say that in the producer, in > order > > to > > > guarantee the c
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Radai: RE: Header header(String key) - returns JUST ONE (the very last) value given a key Iterable headers(String key) - returns ALL under a key void add(Header header) - appends a header (key inside). void remove(String key) - removes ALL HEADERS under a key. I don't think this one is needed: From: Becket Qin Sent: Wednesday, March 1, 2017 2:54 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Hi Ismael, Yes, there is a difference between Batch and Headers. I was just trying to see if that would work. Good point about sending the same ProducerRecord twice, but in fact in that case any reuse of objects would cause problem. As you can imagine if the ProducerRecord has a value as a List and the Interceptor.onSend() can actually add an element to the List. If the producer.send() is called on the same ProducerRecord again, the value list would have one more element than the previously sent ProducerRecord even though the ProducerRecord itself is not mutable, right? Same thing can apply to any modifiable class type. >From this standpoint allowing headers to be mutable doesn't really weaken the mutability we already have. Admittedly a mutable header is kind of guiding user towards to change the headers in the existing object instead of creating a new one. But I think reusing an object while it is possible to be modified by user code is a risk that users themselves are willing to take. And we do not really protect against that. But there still seems value to allow the users to not pay the overhead of creating tons of objects if they do not reuse an object to send it twice, which is probably a more common case. Thanks, Jiangjie (Becket) Qin On Tue, Feb 28, 2017 at 12:43 PM, radai wrote: > I will settle for any API really, but just wanted to point out that as it > stands right now the API targets the most "advanced" (hence obscure and > rare) use cases, at the expense of the simple and common ones. i'd suggest > (as the minimal set): > > Header header(String key) - returns JUST ONE (the very last) value given a > key > Iterable headers(String key) - returns ALL under a key > Iterable headers() - returns all, period. maybe allow null as key > to prev method instead? > void add(Header header) - appends a header (key inside). > void remove(String key) - removes ALL HEADERS under a key. > > this way naive get/set semantics map to header(key)/add(Header) cleanly and > simply while preserving the ability to handle more advanced use cases. > we can always add more convenience methods (like those dealing with lists - > addAll etc) but i think the 5 (potentially 4) above are sufficient for > basically everything. > > On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma wrote: > > > Hi Becket, > > > > Comments inline. > > > > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin > wrote: > > > > > > 1. Regarding the mutability. > > > > > > I think it would be a big convenience to have headers mutable during > > > certain stage in the message life cycle for the use cases you > mentioned. > > I > > > agree there is a material benefit especially given that we may have to > > > modify the headers for each message. > > > > > > That said, I also think it is fair to say that in the producer, in > order > > to > > > guarantee the correctness of the entire logic, it is necessary that at > > some > > > point we need to make producer record immutable. For example we > probably > > > don't want to see that users accidentally updated the headers when the > > > producer is doing the serialization or compression. > > > > > > Given that, would it be possible to make Headers to be able to switch > > from > > > mutable to immutable? We have done this for the Batch in the producer. > > For > > > example, initially the headers are mutable, but after it has gone > through > > > all the interceptors, we can call Headers.close() to make it immutable > > > afterwards. > > > > > > > The difference is that the batch is an internal class that is not exposed > > to users. Can you please explain what happens if a user tries to send the > > same ProducerRecord twice? Would an interceptor fail when trying to > mutate > > the header that is now closed? Or did you have something else in mind? > > > > Thanks, > > Ismael > > > The information contained in this email is strictly confidential and for the use of the addressee only, unless otherwise indicated. If you are not the intended recipient, please do not read, copy, use or disclose to others this message or any attachme
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Ismael, Yes, there is a difference between Batch and Headers. I was just trying to see if that would work. Good point about sending the same ProducerRecord twice, but in fact in that case any reuse of objects would cause problem. As you can imagine if the ProducerRecord has a value as a List and the Interceptor.onSend() can actually add an element to the List. If the producer.send() is called on the same ProducerRecord again, the value list would have one more element than the previously sent ProducerRecord even though the ProducerRecord itself is not mutable, right? Same thing can apply to any modifiable class type. >From this standpoint allowing headers to be mutable doesn't really weaken the mutability we already have. Admittedly a mutable header is kind of guiding user towards to change the headers in the existing object instead of creating a new one. But I think reusing an object while it is possible to be modified by user code is a risk that users themselves are willing to take. And we do not really protect against that. But there still seems value to allow the users to not pay the overhead of creating tons of objects if they do not reuse an object to send it twice, which is probably a more common case. Thanks, Jiangjie (Becket) Qin On Tue, Feb 28, 2017 at 12:43 PM, radai wrote: > I will settle for any API really, but just wanted to point out that as it > stands right now the API targets the most "advanced" (hence obscure and > rare) use cases, at the expense of the simple and common ones. i'd suggest > (as the minimal set): > > Header header(String key) - returns JUST ONE (the very last) value given a > key > Iterable headers(String key) - returns ALL under a key > Iterable headers() - returns all, period. maybe allow null as key > to prev method instead? > void add(Header header) - appends a header (key inside). > void remove(String key) - removes ALL HEADERS under a key. > > this way naive get/set semantics map to header(key)/add(Header) cleanly and > simply while preserving the ability to handle more advanced use cases. > we can always add more convenience methods (like those dealing with lists - > addAll etc) but i think the 5 (potentially 4) above are sufficient for > basically everything. > > On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma wrote: > > > Hi Becket, > > > > Comments inline. > > > > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin > wrote: > > > > > > 1. Regarding the mutability. > > > > > > I think it would be a big convenience to have headers mutable during > > > certain stage in the message life cycle for the use cases you > mentioned. > > I > > > agree there is a material benefit especially given that we may have to > > > modify the headers for each message. > > > > > > That said, I also think it is fair to say that in the producer, in > order > > to > > > guarantee the correctness of the entire logic, it is necessary that at > > some > > > point we need to make producer record immutable. For example we > probably > > > don't want to see that users accidentally updated the headers when the > > > producer is doing the serialization or compression. > > > > > > Given that, would it be possible to make Headers to be able to switch > > from > > > mutable to immutable? We have done this for the Batch in the producer. > > For > > > example, initially the headers are mutable, but after it has gone > through > > > all the interceptors, we can call Headers.close() to make it immutable > > > afterwards. > > > > > > > The difference is that the batch is an internal class that is not exposed > > to users. Can you please explain what happens if a user tries to send the > > same ProducerRecord twice? Would an interceptor fail when trying to > mutate > > the header that is now closed? Or did you have something else in mind? > > > > Thanks, > > Ismael > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
I will settle for any API really, but just wanted to point out that as it stands right now the API targets the most "advanced" (hence obscure and rare) use cases, at the expense of the simple and common ones. i'd suggest (as the minimal set): Header header(String key) - returns JUST ONE (the very last) value given a key Iterable headers(String key) - returns ALL under a key Iterable headers() - returns all, period. maybe allow null as key to prev method instead? void add(Header header) - appends a header (key inside). void remove(String key) - removes ALL HEADERS under a key. this way naive get/set semantics map to header(key)/add(Header) cleanly and simply while preserving the ability to handle more advanced use cases. we can always add more convenience methods (like those dealing with lists - addAll etc) but i think the 5 (potentially 4) above are sufficient for basically everything. On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma wrote: > Hi Becket, > > Comments inline. > > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin wrote: > > > > 1. Regarding the mutability. > > > > I think it would be a big convenience to have headers mutable during > > certain stage in the message life cycle for the use cases you mentioned. > I > > agree there is a material benefit especially given that we may have to > > modify the headers for each message. > > > > That said, I also think it is fair to say that in the producer, in order > to > > guarantee the correctness of the entire logic, it is necessary that at > some > > point we need to make producer record immutable. For example we probably > > don't want to see that users accidentally updated the headers when the > > producer is doing the serialization or compression. > > > > Given that, would it be possible to make Headers to be able to switch > from > > mutable to immutable? We have done this for the Batch in the producer. > For > > example, initially the headers are mutable, but after it has gone through > > all the interceptors, we can call Headers.close() to make it immutable > > afterwards. > > > > The difference is that the batch is an internal class that is not exposed > to users. Can you please explain what happens if a user tries to send the > same ProducerRecord twice? Would an interceptor fail when trying to mutate > the header that is now closed? Or did you have something else in mind? > > Thanks, > Ismael >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Becket, Comments inline. On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin wrote: > > 1. Regarding the mutability. > > I think it would be a big convenience to have headers mutable during > certain stage in the message life cycle for the use cases you mentioned. I > agree there is a material benefit especially given that we may have to > modify the headers for each message. > > That said, I also think it is fair to say that in the producer, in order to > guarantee the correctness of the entire logic, it is necessary that at some > point we need to make producer record immutable. For example we probably > don't want to see that users accidentally updated the headers when the > producer is doing the serialization or compression. > > Given that, would it be possible to make Headers to be able to switch from > mutable to immutable? We have done this for the Batch in the producer. For > example, initially the headers are mutable, but after it has gone through > all the interceptors, we can call Headers.close() to make it immutable > afterwards. > The difference is that the batch is an internal class that is not exposed to users. Can you please explain what happens if a user tries to send the same ProducerRecord twice? Would an interceptor fail when trying to mutate the header that is now closed? Or did you have something else in mind? Thanks, Ismael
Re: [DISCUSS] KIP-82 - Add Record Headers
So on mutable, and headers just at message level seems we're all agreed then. On Radai's comments. 1) agreed - kip updated. 2) Now i totally get the nasty code having this would create as noted by your example. And obviously we want an API which means boiler plate code and most common interaction is supported. not sure what we should do here, so via protocol and as per previous discussion, it was compromised to support multiple values for a key. I believe this was Gwen's request, and compromised accepted. Now in Guava multimap interface it has "Collection get(K key)", there is no single "V get(K key)" styled method. org.apache.collections varient is the same. Two ideas i had to address this would be: A) add two additional methods something /** * Returns the first header for a key if present, else returns null. */ Header first(K key) /** * Replaces an existing header where the key and value for the old equal an existing header. */ replace(Header old, Header new) B) Change the previous agreement to support multiple values for a key (aka multimap), and if someone wants a collection for their value they should code this in their value byte[]. My personal opinion is I think option B is a lot cleaner and will make for a cleaner interface. But it will mean negating on a previous discussion agreement. Would everyone be happy with that? If not Radai is option A ok with you? Or any other ideas? Cheers Mike From: Jason Gustafson Sent: Tuesday, February 28, 2017 1:38 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers If I understand correctly, the suggestion is to let headers be mutable on the producer side basically until after they've passed through the interceptors. That sounds like a reasonable compromise to me. @Becket 3. It might be useful to have headers at MessageSet level as well so we can > avoid decompression in some cases. But given this KIP is already > complicated, I would rather leave this out of the scope and address that > later when needed, e.g. after having batch level interceptors. Yeah, I had the same thought. I was considering factoring the map of header names to generated integer ids into the message set and only using the integer ids in the individual messages. It's a bit complex though, so I agree it's probably best left out. I guess for now if users have a lot of headers, they should just enable compression. -Jason On Mon, Feb 27, 2017 at 1:16 PM, radai wrote: > a few comments on the KIP as it is now: > > 1. instead of add(Header) + add (Iterable) i suggest we use add + > addAll. this is more in line with how java collections work and may > therefor be more intuitive > > 2. common user code dealing with headers will want get("someKey") / > set("someKey"), or equivalent. code using multiple headers under the same > key will be rare, and code iterating over all headers would be even rarer > (probably only for kafka-connect equivalent use cases, really). as the API > looks right now, the most common and trivial cases will be gnarly: >get("someKey") --> > record.headers().headers("someKey").iterator().next().value(). and this is > before i start talking about how nulls/emptys are handled. >replace("someKey") --> > record.headers().remove(record.headers().headers("someKey")); > record.headers().append(new Header("someKey", value)); > > this is why i think we should start with get()/set() which are single-value > map semantics (so set overwrites), then add getAll() (multi-map), append() > etc on top. make the common case pretty. > > On Sun, Feb 26, 2017 at 2:01 AM, Michael Pearce > wrote: > > > Hi Becket, > > > > On 1) > > > > Yes truly we wanted mutable headers also. Alas we couldn't think of a > > solution would address Jason's point around, once a record is sent it > > shouldn't be possible to mutate it, for cases where you send twice the > same > > record. > > > > Thank you so much for your solution i think this will work very nicely :) > > > > Agreed we only need to do mutable to immutable conversion > > > > I think you solution with a ".close()" taken from else where in the kafka > > protocol where mutability is existent is a great solution, and happy > middle > > ground. > > > > @Jason you agree, this resolves your concerns if we had mutable headers? > > > > > > On 2) > > Agreed, this was only added as i couldn't think of a solution to that > > would address Jason's concern, but really didn't want to force end users > to > > constantly write ugly boiler plate code. If we agr
Re: [DISCUSS] KIP-82 - Add Record Headers
If I understand correctly, the suggestion is to let headers be mutable on the producer side basically until after they've passed through the interceptors. That sounds like a reasonable compromise to me. @Becket 3. It might be useful to have headers at MessageSet level as well so we can > avoid decompression in some cases. But given this KIP is already > complicated, I would rather leave this out of the scope and address that > later when needed, e.g. after having batch level interceptors. Yeah, I had the same thought. I was considering factoring the map of header names to generated integer ids into the message set and only using the integer ids in the individual messages. It's a bit complex though, so I agree it's probably best left out. I guess for now if users have a lot of headers, they should just enable compression. -Jason On Mon, Feb 27, 2017 at 1:16 PM, radai wrote: > a few comments on the KIP as it is now: > > 1. instead of add(Header) + add (Iterable) i suggest we use add + > addAll. this is more in line with how java collections work and may > therefor be more intuitive > > 2. common user code dealing with headers will want get("someKey") / > set("someKey"), or equivalent. code using multiple headers under the same > key will be rare, and code iterating over all headers would be even rarer > (probably only for kafka-connect equivalent use cases, really). as the API > looks right now, the most common and trivial cases will be gnarly: >get("someKey") --> > record.headers().headers("someKey").iterator().next().value(). and this is > before i start talking about how nulls/emptys are handled. >replace("someKey") --> > record.headers().remove(record.headers().headers("someKey")); > record.headers().append(new Header("someKey", value)); > > this is why i think we should start with get()/set() which are single-value > map semantics (so set overwrites), then add getAll() (multi-map), append() > etc on top. make the common case pretty. > > On Sun, Feb 26, 2017 at 2:01 AM, Michael Pearce > wrote: > > > Hi Becket, > > > > On 1) > > > > Yes truly we wanted mutable headers also. Alas we couldn't think of a > > solution would address Jason's point around, once a record is sent it > > shouldn't be possible to mutate it, for cases where you send twice the > same > > record. > > > > Thank you so much for your solution i think this will work very nicely :) > > > > Agreed we only need to do mutable to immutable conversion > > > > I think you solution with a ".close()" taken from else where in the kafka > > protocol where mutability is existent is a great solution, and happy > middle > > ground. > > > > @Jason you agree, this resolves your concerns if we had mutable headers? > > > > > > On 2) > > Agreed, this was only added as i couldn't think of a solution to that > > would address Jason's concern, but really didn't want to force end users > to > > constantly write ugly boiler plate code. If we agree on you solution for > 1, > > very happy to remove these. > > > > On 3) > > I also would like to keep the scope of this KIP limited to Message > Headers > > for now, else we run the risk of not getting even these delivered for > next > > release and we're almost now there on getting this KIP to the state > > everyone is happy. As you note address that later if theres the need. > > > > > > Ill leave it 24hrs and update the kip if no strong objections based on > > your solution for 1 & 2. > > > > Cheers > > Mike > > > > __ __ > > From: Becket Qin > > Sent: Saturday, February 25, 2017 10:33 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > Hey Michael, > > > > Thanks for the KIP. It looks good overall and it looks we only have few > > things to agree on. > > > > 1. Regarding the mutability. > > > > I think it would be a big convenience to have headers mutable during > > certain stage in the message life cycle for the use cases you mentioned. > I > > agree there is a material benefit especially given that we may have to > > modify the headers for each message. > > > > That said, I also think it is fair to say that in the producer, in order > to > > guarantee the correctness of the entire logic, it is necessary that at > some > > point we need to make producer record immutable. For example we probably > > don't wa
Re: [DISCUSS] KIP-82 - Add Record Headers
a few comments on the KIP as it is now: 1. instead of add(Header) + add (Iterable) i suggest we use add + addAll. this is more in line with how java collections work and may therefor be more intuitive 2. common user code dealing with headers will want get("someKey") / set("someKey"), or equivalent. code using multiple headers under the same key will be rare, and code iterating over all headers would be even rarer (probably only for kafka-connect equivalent use cases, really). as the API looks right now, the most common and trivial cases will be gnarly: get("someKey") --> record.headers().headers("someKey").iterator().next().value(). and this is before i start talking about how nulls/emptys are handled. replace("someKey") --> record.headers().remove(record.headers().headers("someKey")); record.headers().append(new Header("someKey", value)); this is why i think we should start with get()/set() which are single-value map semantics (so set overwrites), then add getAll() (multi-map), append() etc on top. make the common case pretty. On Sun, Feb 26, 2017 at 2:01 AM, Michael Pearce wrote: > Hi Becket, > > On 1) > > Yes truly we wanted mutable headers also. Alas we couldn't think of a > solution would address Jason's point around, once a record is sent it > shouldn't be possible to mutate it, for cases where you send twice the same > record. > > Thank you so much for your solution i think this will work very nicely :) > > Agreed we only need to do mutable to immutable conversion > > I think you solution with a ".close()" taken from else where in the kafka > protocol where mutability is existent is a great solution, and happy middle > ground. > > @Jason you agree, this resolves your concerns if we had mutable headers? > > > On 2) > Agreed, this was only added as i couldn't think of a solution to that > would address Jason's concern, but really didn't want to force end users to > constantly write ugly boiler plate code. If we agree on you solution for 1, > very happy to remove these. > > On 3) > I also would like to keep the scope of this KIP limited to Message Headers > for now, else we run the risk of not getting even these delivered for next > release and we're almost now there on getting this KIP to the state > everyone is happy. As you note address that later if theres the need. > > > Ill leave it 24hrs and update the kip if no strong objections based on > your solution for 1 & 2. > > Cheers > Mike > > __ __ > From: Becket Qin > Sent: Saturday, February 25, 2017 10:33 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > Hey Michael, > > Thanks for the KIP. It looks good overall and it looks we only have few > things to agree on. > > 1. Regarding the mutability. > > I think it would be a big convenience to have headers mutable during > certain stage in the message life cycle for the use cases you mentioned. I > agree there is a material benefit especially given that we may have to > modify the headers for each message. > > That said, I also think it is fair to say that in the producer, in order to > guarantee the correctness of the entire logic, it is necessary that at some > point we need to make producer record immutable. For example we probably > don't want to see that users accidentally updated the headers when the > producer is doing the serialization or compression. > > Given that, would it be possible to make Headers to be able to switch from > mutable to immutable? We have done this for the Batch in the producer. For > example, initially the headers are mutable, but after it has gone through > all the interceptors, we can call Headers.close() to make it immutable > afterwards. > > On the consumer side, we can probably always leave the the ConsumerRecord > mutable because after we give the messages to the users, Kafka consumer > itself does not care about whether the headers are modified or not anymore. > > So far I think we only need to do the mutable to immutable conversion. If > there are use case require immutable to mutable conversion, we may need > something more than a closable. > > 2. If we agree on what mentioned above, I think it probably makes sense to > put the addHeaders()/removeHeaders() methods into Headers class and just > leave the headers() method in ProducerRecord and ConsumerRecord. > > 3. It might be useful to have headers at MessageSet level as well so we can > avoid decompression in some cases. But given this KIP is already > complicated, I would rather leave this out of the scope and address that > later when needed, e.g
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Becket, On 1) Yes truly we wanted mutable headers also. Alas we couldn't think of a solution would address Jason's point around, once a record is sent it shouldn't be possible to mutate it, for cases where you send twice the same record. Thank you so much for your solution i think this will work very nicely :) Agreed we only need to do mutable to immutable conversion I think you solution with a ".close()" taken from else where in the kafka protocol where mutability is existent is a great solution, and happy middle ground. @Jason you agree, this resolves your concerns if we had mutable headers? On 2) Agreed, this was only added as i couldn't think of a solution to that would address Jason's concern, but really didn't want to force end users to constantly write ugly boiler plate code. If we agree on you solution for 1, very happy to remove these. On 3) I also would like to keep the scope of this KIP limited to Message Headers for now, else we run the risk of not getting even these delivered for next release and we're almost now there on getting this KIP to the state everyone is happy. As you note address that later if theres the need. Ill leave it 24hrs and update the kip if no strong objections based on your solution for 1 & 2. Cheers Mike __ __ From: Becket Qin Sent: Saturday, February 25, 2017 10:33 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Hey Michael, Thanks for the KIP. It looks good overall and it looks we only have few things to agree on. 1. Regarding the mutability. I think it would be a big convenience to have headers mutable during certain stage in the message life cycle for the use cases you mentioned. I agree there is a material benefit especially given that we may have to modify the headers for each message. That said, I also think it is fair to say that in the producer, in order to guarantee the correctness of the entire logic, it is necessary that at some point we need to make producer record immutable. For example we probably don't want to see that users accidentally updated the headers when the producer is doing the serialization or compression. Given that, would it be possible to make Headers to be able to switch from mutable to immutable? We have done this for the Batch in the producer. For example, initially the headers are mutable, but after it has gone through all the interceptors, we can call Headers.close() to make it immutable afterwards. On the consumer side, we can probably always leave the the ConsumerRecord mutable because after we give the messages to the users, Kafka consumer itself does not care about whether the headers are modified or not anymore. So far I think we only need to do the mutable to immutable conversion. If there are use case require immutable to mutable conversion, we may need something more than a closable. 2. If we agree on what mentioned above, I think it probably makes sense to put the addHeaders()/removeHeaders() methods into Headers class and just leave the headers() method in ProducerRecord and ConsumerRecord. 3. It might be useful to have headers at MessageSet level as well so we can avoid decompression in some cases. But given this KIP is already complicated, I would rather leave this out of the scope and address that later when needed, e.g. after having batch level interceptors. Thanks, Jiangjie (Becket) Qin On Fri, Feb 24, 2017 at 3:56 PM, Michael Pearce wrote: > KIP updated in response to the below comments: > >> 1. Is the intent of `Headers.filter` to include or exclude the > headers > > matching the key? Can you add a javadoc to clarify? > > 2. The KIP mentions that we will introduce V4 of FetchRequest > and V4 of > > ProduceRequest. Can you change this to say that the changes will > > piggyback > > onto V3 of ProduceRequest and V4 of FetchRequest which were > introduced > > in > > KIP-98? > > > > > On 24/02/2017, 23:20, "Michael Pearce" wrote: > > We’re trying to make an eco-system for people to be able to use > headers, I think we want to ensure some least common features are supported > and not limited. > > > Some examples we have already. > > On consume interceptors a security interceptor may need to take the > current header, decrypt the data and replace the token with the next token > for the next processing, in case of a single decryption token being one > time use only. > > On produce it could be the interceptors add some values in the clear > from the systems that supply them, but later a security header interceptor > needs to encrypt some headers, as such needs to replace the current value > with new one. > > I note Radai already request
Re: [DISCUSS] KIP-82 - Add Record Headers
sumerRecord. If they only save some boilerplate, I'd just > as well > > not > > have them. > > > > Also a couple minor comments: > > > > 1. Is the intent of `Headers.filter` to include or exclude > the headers > > matching the key? Can you add a javadoc to clarify? > > 2. The KIP mentions that we will introduce V4 of > FetchRequest and V4 of > > ProduceRequest. Can you change this to say that the changes > will > > piggyback > > onto V3 of ProduceRequest and V4 of FetchRequest which were > introduced > > in > > KIP-98? > > > > The rest of the KIP looks good to me. > > > > -Jason > > > > On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > I’ve added the methods on the ProducerRecord that will > return a new > > > instance of ProducerRecord with modified headers. > > > > > > On 24/02/2017, 19:22, "Michael Pearce" < > michael.pea...@ig.com> > > wrote: > > > > > > Pattern.compile is expensive, and even if cached > String.equals is > > > faster than matched. also if we end up with an internal > map in > > future for > > > performance it will be easier to be by key. > > > > > > As all that's needed is to get header by key. > > > > > > With like the other arguements of let's implement > simple and > > then we > > > can always add pattern later as well if it's found it's > needed. (As > > noted > > > it's easier to add methods than to take away) > > > > > > Great I'll update kip with extra methods on > producerecord and a > > note > > > that new objects are returned by method calls. > > > > > > > > > > > > Sent using OWA for iPhone > > > > > > From: Jason Gustafson > > > Sent: Friday, February 24, 2017 6:51:45 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > The APIs in the current KIP look good to me. Just a > couple > > questions: > > > why > > > does filter not return Headers? Also would it be > useful if the > > key is a > > > regex? > > > > > > On the point of immutability.. One option might be to > use a > > mutable > > > object > > > only when passing the headers through the interceptor > chain. I > > think as > > > long as we resort to mutability only when clear > performance > > results > > > show > > > that it is worthwhile, I am satisfied. As Ismael > noted, for > > common > > > scenarios it is possible to get reasonable performance > with > > immutable > > > objects. > > > > > > -Jason > > > > > > On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce < > > michael.pea...@ig.com > > > > > > > wrote: > > > > > > > Hi > > > > > > > > On 1, How can you guarantee two separate > implemented clients > > would > > > add > > > > the headers in the same order we are not specifying > an order > > at the > > > > protocol level (nor should we) with regards to keyA > being > > ordered > > > before > > > > keyB? We shouldn’t be expecting keyA to be always > set before > > keyB. > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
KIP updated in response to the below comments: > 1. Is the intent of `Headers.filter` to include or exclude the headers > matching the key? Can you add a javadoc to clarify? > 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of > ProduceRequest. Can you change this to say that the changes will > piggyback > onto V3 of ProduceRequest and V4 of FetchRequest which were introduced > in > KIP-98? On 24/02/2017, 23:20, "Michael Pearce" wrote: We’re trying to make an eco-system for people to be able to use headers, I think we want to ensure some least common features are supported and not limited. Some examples we have already. On consume interceptors a security interceptor may need to take the current header, decrypt the data and replace the token with the next token for the next processing, in case of a single decryption token being one time use only. On produce it could be the interceptors add some values in the clear from the systems that supply them, but later a security header interceptor needs to encrypt some headers, as such needs to replace the current value with new one. I note Radai already requested this in the thread, I assume he has some use case also. S Simple add / remove is a least common feature. Rgds, Mike On 24/02/2017, 23:00, "Jason Gustafson" wrote: Hey Michael, I'm not strongly opposed to them; I just don't see a lot of benefit. One thing it would be good to understand is why a consumer interceptor would need to add headers and why a producer interceptor would need to remove them. Maybe we only need the common cases? Thanks, Jason On Fri, Feb 24, 2017 at 2:22 PM, Michael Pearce wrote: > Hi Jason, > > Sorry I thought this was the agreed compromise to provide an api that > avoid boiler plate in return for immutabilty. > > If not then mutability will be needed as a goal is to have a single clean > method call to append/remove a header. > > Cheers > Mike > > On 24/02/2017, 22:15, "Jason Gustafson" wrote: > > Hey Michael, > > I didn't actually comment on the new methods for ProducerRecord and > ConsumerRecord. If they only save some boilerplate, I'd just as well > not > have them. > > Also a couple minor comments: > > 1. Is the intent of `Headers.filter` to include or exclude the headers > matching the key? Can you add a javadoc to clarify? > 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of > ProduceRequest. Can you change this to say that the changes will > piggyback > onto V3 of ProduceRequest and V4 of FetchRequest which were introduced > in > KIP-98? > > The rest of the KIP looks good to me. > > -Jason > > On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > I’ve added the methods on the ProducerRecord that will return a new > > instance of ProducerRecord with modified headers. > > > > On 24/02/2017, 19:22, "Michael Pearce" > wrote: > > > > Pattern.compile is expensive, and even if cached String.equals is > > faster than matched. also if we end up with an internal map in > future for > > performance it will be easier to be by key. > > > > As all that's needed is to get header by key. > > > > With like the other arguements of let's implement simple and > then we > > can always add pattern later as well if it's found it's needed. (As > noted > > it's easier to add methods than to take away) > > > > Great I'll update kip with extra methods on producerecord and a > note > > that new objects are returned by method calls. > > > > > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > S
Re: [DISCUSS] KIP-82 - Add Record Headers
We’re trying to make an eco-system for people to be able to use headers, I think we want to ensure some least common features are supported and not limited. Some examples we have already. On consume interceptors a security interceptor may need to take the current header, decrypt the data and replace the token with the next token for the next processing, in case of a single decryption token being one time use only. On produce it could be the interceptors add some values in the clear from the systems that supply them, but later a security header interceptor needs to encrypt some headers, as such needs to replace the current value with new one. I note Radai already requested this in the thread, I assume he has some use case also. S Simple add / remove is a least common feature. Rgds, Mike On 24/02/2017, 23:00, "Jason Gustafson" wrote: Hey Michael, I'm not strongly opposed to them; I just don't see a lot of benefit. One thing it would be good to understand is why a consumer interceptor would need to add headers and why a producer interceptor would need to remove them. Maybe we only need the common cases? Thanks, Jason On Fri, Feb 24, 2017 at 2:22 PM, Michael Pearce wrote: > Hi Jason, > > Sorry I thought this was the agreed compromise to provide an api that > avoid boiler plate in return for immutabilty. > > If not then mutability will be needed as a goal is to have a single clean > method call to append/remove a header. > > Cheers > Mike > > On 24/02/2017, 22:15, "Jason Gustafson" wrote: > > Hey Michael, > > I didn't actually comment on the new methods for ProducerRecord and > ConsumerRecord. If they only save some boilerplate, I'd just as well > not > have them. > > Also a couple minor comments: > > 1. Is the intent of `Headers.filter` to include or exclude the headers > matching the key? Can you add a javadoc to clarify? > 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of > ProduceRequest. Can you change this to say that the changes will > piggyback > onto V3 of ProduceRequest and V4 of FetchRequest which were introduced > in > KIP-98? > > The rest of the KIP looks good to me. > > -Jason > > On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > I’ve added the methods on the ProducerRecord that will return a new > > instance of ProducerRecord with modified headers. > > > > On 24/02/2017, 19:22, "Michael Pearce" > wrote: > > > > Pattern.compile is expensive, and even if cached String.equals is > > faster than matched. also if we end up with an internal map in > future for > > performance it will be easier to be by key. > > > > As all that's needed is to get header by key. > > > > With like the other arguements of let's implement simple and > then we > > can always add pattern later as well if it's found it's needed. (As > noted > > it's easier to add methods than to take away) > > > > Great I'll update kip with extra methods on producerecord and a > note > > that new objects are returned by method calls. > > > > > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > Sent: Friday, February 24, 2017 6:51:45 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > The APIs in the current KIP look good to me. Just a couple > questions: > > why > > does filter not return Headers? Also would it be useful if the > key is a > > regex? > > > > On the point of immutability.. One option might be to use a > mutable > > object > > only when passing the headers through the interceptor chain. I > think as > > long as we resort to mutability only when clear performance > results > > show > > that it is worthwhile, I am satisfied. As Ismael noted, for > common > > scenarios it is possible to get reasonable performance with
Re: [DISCUSS] KIP-82 - Add Record Headers
Hey Michael, I'm not strongly opposed to them; I just don't see a lot of benefit. One thing it would be good to understand is why a consumer interceptor would need to add headers and why a producer interceptor would need to remove them. Maybe we only need the common cases? Thanks, Jason On Fri, Feb 24, 2017 at 2:22 PM, Michael Pearce wrote: > Hi Jason, > > Sorry I thought this was the agreed compromise to provide an api that > avoid boiler plate in return for immutabilty. > > If not then mutability will be needed as a goal is to have a single clean > method call to append/remove a header. > > Cheers > Mike > > On 24/02/2017, 22:15, "Jason Gustafson" wrote: > > Hey Michael, > > I didn't actually comment on the new methods for ProducerRecord and > ConsumerRecord. If they only save some boilerplate, I'd just as well > not > have them. > > Also a couple minor comments: > > 1. Is the intent of `Headers.filter` to include or exclude the headers > matching the key? Can you add a javadoc to clarify? > 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of > ProduceRequest. Can you change this to say that the changes will > piggyback > onto V3 of ProduceRequest and V4 of FetchRequest which were introduced > in > KIP-98? > > The rest of the KIP looks good to me. > > -Jason > > On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > I’ve added the methods on the ProducerRecord that will return a new > > instance of ProducerRecord with modified headers. > > > > On 24/02/2017, 19:22, "Michael Pearce" > wrote: > > > > Pattern.compile is expensive, and even if cached String.equals is > > faster than matched. also if we end up with an internal map in > future for > > performance it will be easier to be by key. > > > > As all that's needed is to get header by key. > > > > With like the other arguements of let's implement simple and > then we > > can always add pattern later as well if it's found it's needed. (As > noted > > it's easier to add methods than to take away) > > > > Great I'll update kip with extra methods on producerecord and a > note > > that new objects are returned by method calls. > > > > > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > Sent: Friday, February 24, 2017 6:51:45 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > The APIs in the current KIP look good to me. Just a couple > questions: > > why > > does filter not return Headers? Also would it be useful if the > key is a > > regex? > > > > On the point of immutability.. One option might be to use a > mutable > > object > > only when passing the headers through the interceptor chain. I > think as > > long as we resort to mutability only when clear performance > results > > show > > that it is worthwhile, I am satisfied. As Ismael noted, for > common > > scenarios it is possible to get reasonable performance with > immutable > > objects. > > > > -Jason > > > > On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce < > michael.pea...@ig.com > > > > > wrote: > > > > > Hi > > > > > > On 1, How can you guarantee two separate implemented clients > would > > add > > > the headers in the same order we are not specifying an order > at the > > > protocol level (nor should we) with regards to keyA being > ordered > > before > > > keyB? We shouldn’t be expecting keyA to be always set before > keyB. > > > > > > On 2, I believe we have changed the naming based on feedback > from > > Jason > > > already, e.g. we don’t have “get” method that inferred O(1) > > performance, > > > like wise nor “put” but we have an “append” > > > > > > On 3, in the KafkaProducer, I think we have mutability > already, the > > value > > > for time is changed if it is null, at the point o
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Jason, Sorry I thought this was the agreed compromise to provide an api that avoid boiler plate in return for immutabilty. If not then mutability will be needed as a goal is to have a single clean method call to append/remove a header. Cheers Mike On 24/02/2017, 22:15, "Jason Gustafson" wrote: Hey Michael, I didn't actually comment on the new methods for ProducerRecord and ConsumerRecord. If they only save some boilerplate, I'd just as well not have them. Also a couple minor comments: 1. Is the intent of `Headers.filter` to include or exclude the headers matching the key? Can you add a javadoc to clarify? 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of ProduceRequest. Can you change this to say that the changes will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98? The rest of the KIP looks good to me. -Jason On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce wrote: > I’ve added the methods on the ProducerRecord that will return a new > instance of ProducerRecord with modified headers. > > On 24/02/2017, 19:22, "Michael Pearce" wrote: > > Pattern.compile is expensive, and even if cached String.equals is > faster than matched. also if we end up with an internal map in future for > performance it will be easier to be by key. > > As all that's needed is to get header by key. > > With like the other arguements of let's implement simple and then we > can always add pattern later as well if it's found it's needed. (As noted > it's easier to add methods than to take away) > > Great I'll update kip with extra methods on producerecord and a note > that new objects are returned by method calls. > > > > Sent using OWA for iPhone > > From: Jason Gustafson > Sent: Friday, February 24, 2017 6:51:45 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > The APIs in the current KIP look good to me. Just a couple questions: > why > does filter not return Headers? Also would it be useful if the key is a > regex? > > On the point of immutability.. One option might be to use a mutable > object > only when passing the headers through the interceptor chain. I think as > long as we resort to mutability only when clear performance results > show > that it is worthwhile, I am satisfied. As Ismael noted, for common > scenarios it is possible to get reasonable performance with immutable > objects. > > -Jason > > On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce > > wrote: > > > Hi > > > > On 1, How can you guarantee two separate implemented clients would > add > > the headers in the same order we are not specifying an order at the > > protocol level (nor should we) with regards to keyA being ordered > before > > keyB? We shouldn’t be expecting keyA to be always set before keyB. > > > > On 2, I believe we have changed the naming based on feedback from > Jason > > already, e.g. we don’t have “get” method that inferred O(1) > performance, > > like wise nor “put” but we have an “append” > > > > On 3, in the KafkaProducer, I think we have mutability already, the > value > > for time is changed if it is null, at the point of send: > > “ > > long timestamp = record.timestamp() == null ? > > time.milliseconds() : record.timestamp(); > > “ > > > > As such the timestamp is already mutable, so what’s the difference > here, > > we already have some mixed semantics. On timestamp. > > e.g. currently if I send to records with timestamp not set, the wire > > binary sent the value for the timestamp would be different, as such > we have > > mutation for the same record. > > > > On 4, I think we should not expect not 1 or 2 headers, but infact > 10’s of > > headers. This is the concern on immutable headers, whilst the append > > self-reference works nicely, what if someone needs to remove a > header? > > > > Trying to get this moving: >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hey Michael, I didn't actually comment on the new methods for ProducerRecord and ConsumerRecord. If they only save some boilerplate, I'd just as well not have them. Also a couple minor comments: 1. Is the intent of `Headers.filter` to include or exclude the headers matching the key? Can you add a javadoc to clarify? 2. The KIP mentions that we will introduce V4 of FetchRequest and V4 of ProduceRequest. Can you change this to say that the changes will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98? The rest of the KIP looks good to me. -Jason On Fri, Feb 24, 2017 at 12:46 PM, Michael Pearce wrote: > I’ve added the methods on the ProducerRecord that will return a new > instance of ProducerRecord with modified headers. > > On 24/02/2017, 19:22, "Michael Pearce" wrote: > > Pattern.compile is expensive, and even if cached String.equals is > faster than matched. also if we end up with an internal map in future for > performance it will be easier to be by key. > > As all that's needed is to get header by key. > > With like the other arguements of let's implement simple and then we > can always add pattern later as well if it's found it's needed. (As noted > it's easier to add methods than to take away) > > Great I'll update kip with extra methods on producerecord and a note > that new objects are returned by method calls. > > > > Sent using OWA for iPhone > > From: Jason Gustafson > Sent: Friday, February 24, 2017 6:51:45 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > The APIs in the current KIP look good to me. Just a couple questions: > why > does filter not return Headers? Also would it be useful if the key is a > regex? > > On the point of immutability.. One option might be to use a mutable > object > only when passing the headers through the interceptor chain. I think as > long as we resort to mutability only when clear performance results > show > that it is worthwhile, I am satisfied. As Ismael noted, for common > scenarios it is possible to get reasonable performance with immutable > objects. > > -Jason > > On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce > > wrote: > > > Hi > > > > On 1, How can you guarantee two separate implemented clients would > add > > the headers in the same order we are not specifying an order at the > > protocol level (nor should we) with regards to keyA being ordered > before > > keyB? We shouldn’t be expecting keyA to be always set before keyB. > > > > On 2, I believe we have changed the naming based on feedback from > Jason > > already, e.g. we don’t have “get” method that inferred O(1) > performance, > > like wise nor “put” but we have an “append” > > > > On 3, in the KafkaProducer, I think we have mutability already, the > value > > for time is changed if it is null, at the point of send: > > “ > > long timestamp = record.timestamp() == null ? > > time.milliseconds() : record.timestamp(); > > “ > > > > As such the timestamp is already mutable, so what’s the difference > here, > > we already have some mixed semantics. On timestamp. > > e.g. currently if I send to records with timestamp not set, the wire > > binary sent the value for the timestamp would be different, as such > we have > > mutation for the same record. > > > > On 4, I think we should not expect not 1 or 2 headers, but infact > 10’s of > > headers. This is the concern on immutable headers, whilst the append > > self-reference works nicely, what if someone needs to remove a > header? > > > > Trying to get this moving: > > > > If we really wanted Immutable Headers and essentially you guys wont > give > > +1 for it without. > > > > Whats the feeling for adding methods to ProducerRecord that does the > > boiler plate code or creating a new ProducerRecord with the altered > new > > headers (appended or removed) inside. E.g. > > > > ProducerRecord { > > > > > > ProducerRecord append(Iterable headersToAppend){ > > return new ProducerRecord(key, value, headers.append( > headersToAppend), > > ….) > > } > > > > ProducerRecord remove(Iterable headersToAppend){ > > return new Producer
Re: [DISCUSS] KIP-82 - Add Record Headers
I’ve added the methods on the ProducerRecord that will return a new instance of ProducerRecord with modified headers. On 24/02/2017, 19:22, "Michael Pearce" wrote: Pattern.compile is expensive, and even if cached String.equals is faster than matched. also if we end up with an internal map in future for performance it will be easier to be by key. As all that's needed is to get header by key. With like the other arguements of let's implement simple and then we can always add pattern later as well if it's found it's needed. (As noted it's easier to add methods than to take away) Great I'll update kip with extra methods on producerecord and a note that new objects are returned by method calls. Sent using OWA for iPhone From: Jason Gustafson Sent: Friday, February 24, 2017 6:51:45 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers The APIs in the current KIP look good to me. Just a couple questions: why does filter not return Headers? Also would it be useful if the key is a regex? On the point of immutability.. One option might be to use a mutable object only when passing the headers through the interceptor chain. I think as long as we resort to mutability only when clear performance results show that it is worthwhile, I am satisfied. As Ismael noted, for common scenarios it is possible to get reasonable performance with immutable objects. -Jason On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce wrote: > Hi > > On 1, How can you guarantee two separate implemented clients would add > the headers in the same order we are not specifying an order at the > protocol level (nor should we) with regards to keyA being ordered before > keyB? We shouldn’t be expecting keyA to be always set before keyB. > > On 2, I believe we have changed the naming based on feedback from Jason > already, e.g. we don’t have “get” method that inferred O(1) performance, > like wise nor “put” but we have an “append” > > On 3, in the KafkaProducer, I think we have mutability already, the value > for time is changed if it is null, at the point of send: > “ > long timestamp = record.timestamp() == null ? > time.milliseconds() : record.timestamp(); > “ > > As such the timestamp is already mutable, so what’s the difference here, > we already have some mixed semantics. On timestamp. > e.g. currently if I send to records with timestamp not set, the wire > binary sent the value for the timestamp would be different, as such we have > mutation for the same record. > > On 4, I think we should not expect not 1 or 2 headers, but infact 10’s of > headers. This is the concern on immutable headers, whilst the append > self-reference works nicely, what if someone needs to remove a header? > > Trying to get this moving: > > If we really wanted Immutable Headers and essentially you guys wont give > +1 for it without. > > Whats the feeling for adding methods to ProducerRecord that does the > boiler plate code or creating a new ProducerRecord with the altered new > headers (appended or removed) inside. E.g. > > ProducerRecord { > > > ProducerRecord append(Iterable headersToAppend){ > return new ProducerRecord(key, value, headers.append(headersToAppend), > ….) > } > > ProducerRecord remove(Iterable headersToAppend){ > return new ProducerRecord(key, value, headers.remove(headersToAppend), > ….) > } > > } > > Were the headers methods actually returns new objects, and the producer > records methods create a new producer record with all the current values, > but with the new modified headers. > > Then interceptors / code return this new object? > > > Cheers > Mike > > > > > > > On 24/02/2017, 16:02, "isma...@gmail.com on behalf of Ismael Juma" < > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > Hi Michael, > > Did you mean that you were happy to compromise to keep it mutable or > immutable? You wrote the former, but it sounded from the sentence that > it > could have been a typo. So, my thoughts on this is that there are a few > things to take into account: > > 1. Semantics > 2. Simplicity of use (the common operations should be easy to do) >
Re: [DISCUSS] KIP-82 - Add Record Headers
Pattern.compile is expensive, and even if cached String.equals is faster than matched. also if we end up with an internal map in future for performance it will be easier to be by key. As all that's needed is to get header by key. With like the other arguements of let's implement simple and then we can always add pattern later as well if it's found it's needed. (As noted it's easier to add methods than to take away) Great I'll update kip with extra methods on producerecord and a note that new objects are returned by method calls. Sent using OWA for iPhone From: Jason Gustafson Sent: Friday, February 24, 2017 6:51:45 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers The APIs in the current KIP look good to me. Just a couple questions: why does filter not return Headers? Also would it be useful if the key is a regex? On the point of immutability.. One option might be to use a mutable object only when passing the headers through the interceptor chain. I think as long as we resort to mutability only when clear performance results show that it is worthwhile, I am satisfied. As Ismael noted, for common scenarios it is possible to get reasonable performance with immutable objects. -Jason On Fri, Feb 24, 2017 at 8:48 AM, Michael Pearce wrote: > Hi > > On 1, How can you guarantee two separate implemented clients would add > the headers in the same order we are not specifying an order at the > protocol level (nor should we) with regards to keyA being ordered before > keyB? We shouldn’t be expecting keyA to be always set before keyB. > > On 2, I believe we have changed the naming based on feedback from Jason > already, e.g. we don’t have “get” method that inferred O(1) performance, > like wise nor “put” but we have an “append” > > On 3, in the KafkaProducer, I think we have mutability already, the value > for time is changed if it is null, at the point of send: > “ > long timestamp = record.timestamp() == null ? > time.milliseconds() : record.timestamp(); > “ > > As such the timestamp is already mutable, so what’s the difference here, > we already have some mixed semantics. On timestamp. > e.g. currently if I send to records with timestamp not set, the wire > binary sent the value for the timestamp would be different, as such we have > mutation for the same record. > > On 4, I think we should not expect not 1 or 2 headers, but infact 10’s of > headers. This is the concern on immutable headers, whilst the append > self-reference works nicely, what if someone needs to remove a header? > > Trying to get this moving: > > If we really wanted Immutable Headers and essentially you guys wont give > +1 for it without. > > Whats the feeling for adding methods to ProducerRecord that does the > boiler plate code or creating a new ProducerRecord with the altered new > headers (appended or removed) inside. E.g. > > ProducerRecord { > > > ProducerRecord append(Iterable headersToAppend){ > return new ProducerRecord(key, value, headers.append(headersToAppend), > ….) > } > > ProducerRecord remove(Iterable headersToAppend){ > return new ProducerRecord(key, value, headers.remove(headersToAppend), > ….) > } > > } > > Were the headers methods actually returns new objects, and the producer > records methods create a new producer record with all the current values, > but with the new modified headers. > > Then interceptors / code return this new object? > > > Cheers > Mike > > > > > > > On 24/02/2017, 16:02, "isma...@gmail.com on behalf of Ismael Juma" < > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > Hi Michael, > > Did you mean that you were happy to compromise to keep it mutable or > immutable? You wrote the former, but it sounded from the sentence that > it > could have been a typo. So, my thoughts on this is that there are a few > things to take into account: > > 1. Semantics > 2. Simplicity of use (the common operations should be easy to do) > 3. If it's easy to reason about and safe (immutability helps with this) > 4. Efficiency (both memory and CPU usage) > > Regarding 1, I think it would be good to be very clear about the > guarantees > that we are providing. It seems that we are saying that keys are > unordered, > but what about the case where there are multiple values for the same > key? > It seems that for some use cases (e.g. lineage), it may be useful to > add > values to the same key while preserving the order. > > Regarding 2, I agree that it's useful to have methods in `Head
Re: [DISCUSS] KIP-82 - Add Record Headers
ccessed). And note that making > the > > Headers > > > immutable doesn't necessarily mean that they need to be > > copied: > > > you can do > > > a trick like Guava's Iterables.concat to add additional > > headers > > > without > > > changing the underlying collections. > > > > > > -Jason > > > > > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > > > michael.pea...@ig.com> > > > wrote: > > > > > > > If the argument for not having a map holding the > key, value > > > pairs is due > > > > to garbage creation of HashMap entry's, forcing the > > creation of > > > a whole new > > > > producer record to simply add a head, surely is > creating > > a-lot > > > more? > > > > > > > > From: Jason Gustafson > > > > Sent: Wednesday, February 22, 2017 10:09 PM > > > > To: dev@kafka.apache.org > > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > > > The current producer interceptor API is this: > > > > > > > > ProducerRecord onSend(ProducerRecord > record); > > > > > > > > So adding a header means creating a new > ProducerRecord > > with a > > > new header > > > > added to the current headers and returning it. Would > that > > not > > > work? > > > > > > > > -Jason > > > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce < > > > michael.pea...@ig.com> > > > > wrote: > > > > > > > > > So how would you have this work if not mutable > where > > > interceptors would > > > > > add headers? > > > > > > > > > > Sent using OWA for iPhone > > > > > > > > > > From: Jason Gustafson > > > > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > > > > To: dev@kafka.apache.org > > > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > > > > > I think the point on the mutability of Headers is > worth > > > discussing a > > > > little > > > > > more. As far as I can tell, once the > ProducerRecord (or > > > ConsumerRecord) > > > > is > > > > > constructed, there should be no need to further > change > > the > > > headers. Is > > > > that > > > > > correct? If so, then why not enforce that that is > the > > case > > > through the > > > > API? > > > > > One problem with mutability it that it constrains > the > > > implementation of > > > > > Headers. For example, if we were backing with a > byte > > slice, > > > would we > > > > recopy > > > > > the bytes if a header is added or would we > maintain a > > satellite > > > > collection > > > > > of added records. Seems not great either way. If we > > really > > > think > > > > mutability > > > > > is needed, perhaps we could add a method to > headers to > > convert > > > it to a > > > > > mutable type (e.g. a Headers.toMap)? > > > > > > > > > > I'm also with Ismael about exposing Headers.get(). > I > > thought > >
Re: [DISCUSS] KIP-82 - Add Record Headers
idea of guaranteed > ordering, when in actual fact we don’t provide that guarantee (.e.g one > client can put headerA, then headerB, but another could put headerB then > headerA, this shouldn’t cause issues), Also what if we changed to a hashmap > for the internal implementation, its just a bucket of entries no ordering. > I think we just need to provide an api to add/append headers. > > This ok? If so ill update KIP to record this. > > Cheers > Mike > > On 23/02/2017, 00:37, "Jason Gustafson" wrote: > > The point about usability is fair. It's also reasonable to expect that > common use cases such as appending headers should be done efficiently. > > Perhaps we could compromise with something like this? > > class Headers { > Headers append(Iterable headers); > Headers prepend(Iterable headers); > } > > That retains ease of use while still giving ourselves some flexibility > in > the implementation. > > -Jason > > > On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce > > wrote: > > > I wasn’t referring to the headers needing to be copied, im meaning > the > > fact we’d be forcing a new producer record to be created, with all > the > > contents copied. > > > > i.e what will happen is utility method will be created or end up > being > > used, which does this, and returns the new ProducerRecord instance. > > > > ProducerRecord addHeader(ProducerRecord record, Header header){ > > Return New ProducerRecord(record.key, record.value, > record.timestamp….., > > record.headers.concat(header)) > > } > > > > To me this seems ugly, but will be inevitable if we don’t make adding > > headers to existing records a simple clean method call. > > > > > > > > On 22/02/2017, 22:57, "Michael Pearce" > wrote: > > > > Lazy init can achieve/avoid that. > > > > Re the concat, why don’t we implement that inside the Headers > rather > > than causing everyone to implement this as adding headers in > interceptors > > will be a dominant use case. We want a user friendly API. Having as > a user > > having to code this instead of having the headers handle this for me > seems > > redundant. > > > > On 22/02/2017, 22:34, "Jason Gustafson" > wrote: > > > > I thought the argument was against creating the extra objects > > unnecessarily > > (i.e. if they were not accessed). And note that making the > Headers > > immutable doesn't necessarily mean that they need to be > copied: > > you can do > > a trick like Guava's Iterables.concat to add additional > headers > > without > > changing the underlying collections. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > If the argument for not having a map holding the key, value > > pairs is due > > > to garbage creation of HashMap entry's, forcing the > creation of > > a whole new > > > producer record to simply add a head, surely is creating > a-lot > > more? > > > > > > From: Jason Gustafson > > > Sent: Wednesday, February 22, 2017 10:09 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > The current producer interceptor API is this: > > > > > > ProducerRecord onSend(ProducerRecord record); > > > > > > So adding a header means creating a new ProducerRecord > with a > > new header > > > added to the current headers and returning it. Would that > not > > work? &g
Re: [DISCUSS] KIP-82 - Add Record Headers
be a dominant use case. We want a user friendly API. Having as > a user > > having to code this instead of having the headers handle this for me > seems > > redundant. > > > > On 22/02/2017, 22:34, "Jason Gustafson" > wrote: > > > > I thought the argument was against creating the extra objects > > unnecessarily > > (i.e. if they were not accessed). And note that making the > Headers > > immutable doesn't necessarily mean that they need to be > copied: > > you can do > > a trick like Guava's Iterables.concat to add additional > headers > > without > > changing the underlying collections. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > If the argument for not having a map holding the key, value > > pairs is due > > > to garbage creation of HashMap entry's, forcing the > creation of > > a whole new > > > producer record to simply add a head, surely is creating > a-lot > > more? > > > > > > From: Jason Gustafson > > > Sent: Wednesday, February 22, 2017 10:09 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > The current producer interceptor API is this: > > > > > > ProducerRecord onSend(ProducerRecord record); > > > > > > So adding a header means creating a new ProducerRecord > with a > > new header > > > added to the current headers and returning it. Would that > not > > work? > > > > > > -Jason > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce < > > michael.pea...@ig.com> > > > wrote: > > > > > > > So how would you have this work if not mutable where > > interceptors would > > > > add headers? > > > > > > > > Sent using OWA for iPhone > > > > > > > > From: Jason Gustafson > > > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > > > To: dev@kafka.apache.org > > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > > > I think the point on the mutability of Headers is worth > > discussing a > > > little > > > > more. As far as I can tell, once the ProducerRecord (or > > ConsumerRecord) > > > is > > > > constructed, there should be no need to further change > the > > headers. Is > > > that > > > > correct? If so, then why not enforce that that is the > case > > through the > > > API? > > > > One problem with mutability it that it constrains the > > implementation of > > > > Headers. For example, if we were backing with a byte > slice, > > would we > > > recopy > > > > the bytes if a header is added or would we maintain a > satellite > > > collection > > > > of added records. Seems not great either way. If we > really > > think > > > mutability > > > > is needed, perhaps we could add a method to headers to > convert > > it to a > > > > mutable type (e.g. a Headers.toMap)? > > > > > > > > I'm also with Ismael about exposing Headers.get(). I > thought > > it might > > > make > > > > sense to have a method like this instead: > > > > > > > > Iterable findMatching(Pattern pattern); > > > > > > > > This makes the (potential) need to scan the headers > clear in > > the API. I'd > > > > also be fine exposing no gette
Re: [DISCUSS] KIP-82 - Add Record Headers
So currently KIP has: append filter (get by key, but not inferring perf as noted expectation on O(1) perf with a get method) I agree that supporting some form of remove replace is standard in this kind of API and for the benefit of making a usable API, we should explore adding these. If we’re avoid the put, get styled interface as there seems contention with this naming, we happy with naming these as I mentioned? On 23/02/2017, 12:16, "radai" wrote: append-only would mean that if (for whatever reason) i want to replace a header or strip it out i'd need to copy the whole record. On Wed, Feb 22, 2017 at 5:09 PM, Michael Pearce wrote: > Im happy to compromise to keep it mutable but move to an append style api. > (as in guava interables concat) > > class Headers { >Headers append(Iterable headers); > } > > > I don’t think we’d want prepend, this would give the idea of guaranteed > ordering, when in actual fact we don’t provide that guarantee (.e.g one > client can put headerA, then headerB, but another could put headerB then > headerA, this shouldn’t cause issues), Also what if we changed to a hashmap > for the internal implementation, its just a bucket of entries no ordering. > I think we just need to provide an api to add/append headers. > > This ok? If so ill update KIP to record this. > > Cheers > Mike > > On 23/02/2017, 00:37, "Jason Gustafson" wrote: > > The point about usability is fair. It's also reasonable to expect that > common use cases such as appending headers should be done efficiently. > > Perhaps we could compromise with something like this? > > class Headers { > Headers append(Iterable headers); > Headers prepend(Iterable headers); > } > > That retains ease of use while still giving ourselves some flexibility > in > the implementation. > > -Jason > > > On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce > > wrote: > > > I wasn’t referring to the headers needing to be copied, im meaning > the > > fact we’d be forcing a new producer record to be created, with all > the > > contents copied. > > > > i.e what will happen is utility method will be created or end up > being > > used, which does this, and returns the new ProducerRecord instance. > > > > ProducerRecord addHeader(ProducerRecord record, Header header){ > > Return New ProducerRecord(record.key, record.value, > record.timestamp….., > > record.headers.concat(header)) > > } > > > > To me this seems ugly, but will be inevitable if we don’t make adding > > headers to existing records a simple clean method call. > > > > > > > > On 22/02/2017, 22:57, "Michael Pearce" > wrote: > > > > Lazy init can achieve/avoid that. > > > > Re the concat, why don’t we implement that inside the Headers > rather > > than causing everyone to implement this as adding headers in > interceptors > > will be a dominant use case. We want a user friendly API. Having as > a user > > having to code this instead of having the headers handle this for me > seems > > redundant. > > > > On 22/02/2017, 22:34, "Jason Gustafson" > wrote: > > > > I thought the argument was against creating the extra objects > > unnecessarily > > (i.e. if they were not accessed). And note that making the > Headers > > immutable doesn't necessarily mean that they need to be > copied: > > you can do > > a trick like Guava's Iterables.concat to add additional > headers > > without > > changing the underlying collections. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > If the argument for not having a map holding the key, value > > pairs is due > > > to garbage creation of HashMap entry's, forcing the &g
Re: [DISCUSS] KIP-82 - Add Record Headers
append-only would mean that if (for whatever reason) i want to replace a header or strip it out i'd need to copy the whole record. On Wed, Feb 22, 2017 at 5:09 PM, Michael Pearce wrote: > Im happy to compromise to keep it mutable but move to an append style api. > (as in guava interables concat) > > class Headers { >Headers append(Iterable headers); > } > > > I don’t think we’d want prepend, this would give the idea of guaranteed > ordering, when in actual fact we don’t provide that guarantee (.e.g one > client can put headerA, then headerB, but another could put headerB then > headerA, this shouldn’t cause issues), Also what if we changed to a hashmap > for the internal implementation, its just a bucket of entries no ordering. > I think we just need to provide an api to add/append headers. > > This ok? If so ill update KIP to record this. > > Cheers > Mike > > On 23/02/2017, 00:37, "Jason Gustafson" wrote: > > The point about usability is fair. It's also reasonable to expect that > common use cases such as appending headers should be done efficiently. > > Perhaps we could compromise with something like this? > > class Headers { > Headers append(Iterable headers); > Headers prepend(Iterable headers); > } > > That retains ease of use while still giving ourselves some flexibility > in > the implementation. > > -Jason > > > On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce > > wrote: > > > I wasn’t referring to the headers needing to be copied, im meaning > the > > fact we’d be forcing a new producer record to be created, with all > the > > contents copied. > > > > i.e what will happen is utility method will be created or end up > being > > used, which does this, and returns the new ProducerRecord instance. > > > > ProducerRecord addHeader(ProducerRecord record, Header header){ > > Return New ProducerRecord(record.key, record.value, > record.timestamp….., > > record.headers.concat(header)) > > } > > > > To me this seems ugly, but will be inevitable if we don’t make adding > > headers to existing records a simple clean method call. > > > > > > > > On 22/02/2017, 22:57, "Michael Pearce" > wrote: > > > > Lazy init can achieve/avoid that. > > > > Re the concat, why don’t we implement that inside the Headers > rather > > than causing everyone to implement this as adding headers in > interceptors > > will be a dominant use case. We want a user friendly API. Having as > a user > > having to code this instead of having the headers handle this for me > seems > > redundant. > > > > On 22/02/2017, 22:34, "Jason Gustafson" > wrote: > > > > I thought the argument was against creating the extra objects > > unnecessarily > > (i.e. if they were not accessed). And note that making the > Headers > > immutable doesn't necessarily mean that they need to be > copied: > > you can do > > a trick like Guava's Iterables.concat to add additional > headers > > without > > changing the underlying collections. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > If the argument for not having a map holding the key, value > > pairs is due > > > to garbage creation of HashMap entry's, forcing the > creation of > > a whole new > > > producer record to simply add a head, surely is creating > a-lot > > more? > > > > > > From: Jason Gustafson > > > Sent: Wednesday, February 22, 2017 10:09 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > The current producer interceptor API is this: > > > > > > ProducerRecord onSend(ProducerRecord record); > > > > > > So adding a header means creating a new ProducerRecord > with a > > new header > > > added to the current headers and returning it. Would that > not > > work? &g
Re: [DISCUSS] KIP-82 - Add Record Headers
Im happy to compromise to keep it mutable but move to an append style api. (as in guava interables concat) class Headers { Headers append(Iterable headers); } I don’t think we’d want prepend, this would give the idea of guaranteed ordering, when in actual fact we don’t provide that guarantee (.e.g one client can put headerA, then headerB, but another could put headerB then headerA, this shouldn’t cause issues), Also what if we changed to a hashmap for the internal implementation, its just a bucket of entries no ordering. I think we just need to provide an api to add/append headers. This ok? If so ill update KIP to record this. Cheers Mike On 23/02/2017, 00:37, "Jason Gustafson" wrote: The point about usability is fair. It's also reasonable to expect that common use cases such as appending headers should be done efficiently. Perhaps we could compromise with something like this? class Headers { Headers append(Iterable headers); Headers prepend(Iterable headers); } That retains ease of use while still giving ourselves some flexibility in the implementation. -Jason On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce wrote: > I wasn’t referring to the headers needing to be copied, im meaning the > fact we’d be forcing a new producer record to be created, with all the > contents copied. > > i.e what will happen is utility method will be created or end up being > used, which does this, and returns the new ProducerRecord instance. > > ProducerRecord addHeader(ProducerRecord record, Header header){ > Return New ProducerRecord(record.key, record.value, record.timestamp….., > record.headers.concat(header)) > } > > To me this seems ugly, but will be inevitable if we don’t make adding > headers to existing records a simple clean method call. > > > > On 22/02/2017, 22:57, "Michael Pearce" wrote: > > Lazy init can achieve/avoid that. > > Re the concat, why don’t we implement that inside the Headers rather > than causing everyone to implement this as adding headers in interceptors > will be a dominant use case. We want a user friendly API. Having as a user > having to code this instead of having the headers handle this for me seems > redundant. > > On 22/02/2017, 22:34, "Jason Gustafson" wrote: > > I thought the argument was against creating the extra objects > unnecessarily > (i.e. if they were not accessed). And note that making the Headers > immutable doesn't necessarily mean that they need to be copied: > you can do > a trick like Guava's Iterables.concat to add additional headers > without > changing the underlying collections. > > -Jason > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > If the argument for not having a map holding the key, value > pairs is due > > to garbage creation of HashMap entry's, forcing the creation of > a whole new > > producer record to simply add a head, surely is creating a-lot > more? > > ____________________ > > From: Jason Gustafson > > Sent: Wednesday, February 22, 2017 10:09 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > The current producer interceptor API is this: > > > > ProducerRecord onSend(ProducerRecord record); > > > > So adding a header means creating a new ProducerRecord with a > new header > > added to the current headers and returning it. Would that not > work? > > > > -Jason > > > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce < > michael.pea...@ig.com> > > wrote: > > > > > So how would you have this work if not mutable where > interceptors would > > > add headers? > > > > > > Sent using OWA for iPhone > > > > > > From: Jason Gustafson > > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers >
Re: [DISCUSS] KIP-82 - Add Record Headers
The point about usability is fair. It's also reasonable to expect that common use cases such as appending headers should be done efficiently. Perhaps we could compromise with something like this? class Headers { Headers append(Iterable headers); Headers prepend(Iterable headers); } That retains ease of use while still giving ourselves some flexibility in the implementation. -Jason On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce wrote: > I wasn’t referring to the headers needing to be copied, im meaning the > fact we’d be forcing a new producer record to be created, with all the > contents copied. > > i.e what will happen is utility method will be created or end up being > used, which does this, and returns the new ProducerRecord instance. > > ProducerRecord addHeader(ProducerRecord record, Header header){ > Return New ProducerRecord(record.key, record.value, record.timestamp….., > record.headers.concat(header)) > } > > To me this seems ugly, but will be inevitable if we don’t make adding > headers to existing records a simple clean method call. > > > > On 22/02/2017, 22:57, "Michael Pearce" wrote: > > Lazy init can achieve/avoid that. > > Re the concat, why don’t we implement that inside the Headers rather > than causing everyone to implement this as adding headers in interceptors > will be a dominant use case. We want a user friendly API. Having as a user > having to code this instead of having the headers handle this for me seems > redundant. > > On 22/02/2017, 22:34, "Jason Gustafson" wrote: > > I thought the argument was against creating the extra objects > unnecessarily > (i.e. if they were not accessed). And note that making the Headers > immutable doesn't necessarily mean that they need to be copied: > you can do > a trick like Guava's Iterables.concat to add additional headers > without > changing the underlying collections. > > -Jason > > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > If the argument for not having a map holding the key, value > pairs is due > > to garbage creation of HashMap entry's, forcing the creation of > a whole new > > producer record to simply add a head, surely is creating a-lot > more? > > ____________ > > From: Jason Gustafson > > Sent: Wednesday, February 22, 2017 10:09 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > The current producer interceptor API is this: > > > > ProducerRecord onSend(ProducerRecord record); > > > > So adding a header means creating a new ProducerRecord with a > new header > > added to the current headers and returning it. Would that not > work? > > > > -Jason > > > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce < > michael.pea...@ig.com> > > wrote: > > > > > So how would you have this work if not mutable where > interceptors would > > > add headers? > > > > > > Sent using OWA for iPhone > > > > > > From: Jason Gustafson > > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > I think the point on the mutability of Headers is worth > discussing a > > little > > > more. As far as I can tell, once the ProducerRecord (or > ConsumerRecord) > > is > > > constructed, there should be no need to further change the > headers. Is > > that > > > correct? If so, then why not enforce that that is the case > through the > > API? > > > One problem with mutability it that it constrains the > implementation of > > > Headers. For example, if we were backing with a byte slice, > would we > > recopy > > > the bytes if a header is added or would we maintain a satellite > > collection > > > of added records. Seems not great either way. If we really > think > > mutability > > > is needed, perhaps we could add a method to headers to convert > it to a > > > mutable type (e.g. a Head
Re: [DISCUSS] KIP-82 - Add Record Headers
I wasn’t referring to the headers needing to be copied, im meaning the fact we’d be forcing a new producer record to be created, with all the contents copied. i.e what will happen is utility method will be created or end up being used, which does this, and returns the new ProducerRecord instance. ProducerRecord addHeader(ProducerRecord record, Header header){ Return New ProducerRecord(record.key, record.value, record.timestamp….., record.headers.concat(header)) } To me this seems ugly, but will be inevitable if we don’t make adding headers to existing records a simple clean method call. On 22/02/2017, 22:57, "Michael Pearce" wrote: Lazy init can achieve/avoid that. Re the concat, why don’t we implement that inside the Headers rather than causing everyone to implement this as adding headers in interceptors will be a dominant use case. We want a user friendly API. Having as a user having to code this instead of having the headers handle this for me seems redundant. On 22/02/2017, 22:34, "Jason Gustafson" wrote: I thought the argument was against creating the extra objects unnecessarily (i.e. if they were not accessed). And note that making the Headers immutable doesn't necessarily mean that they need to be copied: you can do a trick like Guava's Iterables.concat to add additional headers without changing the underlying collections. -Jason On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce wrote: > If the argument for not having a map holding the key, value pairs is due > to garbage creation of HashMap entry's, forcing the creation of a whole new > producer record to simply add a head, surely is creating a-lot more? > > From: Jason Gustafson > Sent: Wednesday, February 22, 2017 10:09 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > The current producer interceptor API is this: > > ProducerRecord onSend(ProducerRecord record); > > So adding a header means creating a new ProducerRecord with a new header > added to the current headers and returning it. Would that not work? > > -Jason > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce > wrote: > > > So how would you have this work if not mutable where interceptors would > > add headers? > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > I think the point on the mutability of Headers is worth discussing a > little > > more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) > is > > constructed, there should be no need to further change the headers. Is > that > > correct? If so, then why not enforce that that is the case through the > API? > > One problem with mutability it that it constrains the implementation of > > Headers. For example, if we were backing with a byte slice, would we > recopy > > the bytes if a header is added or would we maintain a satellite > collection > > of added records. Seems not great either way. If we really think > mutability > > is needed, perhaps we could add a method to headers to convert it to a > > mutable type (e.g. a Headers.toMap)? > > > > I'm also with Ismael about exposing Headers.get(). I thought it might > make > > sense to have a method like this instead: > > > > Iterable findMatching(Pattern pattern); > > > > This makes the (potential) need to scan the headers clear in the API. I'd > > also be fine exposing no getter at all. In general, Ï think it's good to > > start with a minimalistic API and work from there since it's always > easier > > to add methods than remove them. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce > > wrote: > > > > > > > > Hi Ismael > > > > > > On point 1, > > > > > > Sure makes sense will update short
Re: [DISCUSS] KIP-82 - Add Record Headers
Lazy init can achieve/avoid that. Re the concat, why don’t we implement that inside the Headers rather than causing everyone to implement this as adding headers in interceptors will be a dominant use case. We want a user friendly API. Having as a user having to code this instead of having the headers handle this for me seems redundant. On 22/02/2017, 22:34, "Jason Gustafson" wrote: I thought the argument was against creating the extra objects unnecessarily (i.e. if they were not accessed). And note that making the Headers immutable doesn't necessarily mean that they need to be copied: you can do a trick like Guava's Iterables.concat to add additional headers without changing the underlying collections. -Jason On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce wrote: > If the argument for not having a map holding the key, value pairs is due > to garbage creation of HashMap entry's, forcing the creation of a whole new > producer record to simply add a head, surely is creating a-lot more? > > From: Jason Gustafson > Sent: Wednesday, February 22, 2017 10:09 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > The current producer interceptor API is this: > > ProducerRecord onSend(ProducerRecord record); > > So adding a header means creating a new ProducerRecord with a new header > added to the current headers and returning it. Would that not work? > > -Jason > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce > wrote: > > > So how would you have this work if not mutable where interceptors would > > add headers? > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > I think the point on the mutability of Headers is worth discussing a > little > > more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) > is > > constructed, there should be no need to further change the headers. Is > that > > correct? If so, then why not enforce that that is the case through the > API? > > One problem with mutability it that it constrains the implementation of > > Headers. For example, if we were backing with a byte slice, would we > recopy > > the bytes if a header is added or would we maintain a satellite > collection > > of added records. Seems not great either way. If we really think > mutability > > is needed, perhaps we could add a method to headers to convert it to a > > mutable type (e.g. a Headers.toMap)? > > > > I'm also with Ismael about exposing Headers.get(). I thought it might > make > > sense to have a method like this instead: > > > > Iterable findMatching(Pattern pattern); > > > > This makes the (potential) need to scan the headers clear in the API. I'd > > also be fine exposing no getter at all. In general, Ï think it's good to > > start with a minimalistic API and work from there since it's always > easier > > to add methods than remove them. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce > > wrote: > > > > > > > > Hi Ismael > > > > > > On point 1, > > > > > > Sure makes sense will update shortly. > > > > > > On point 2, > > > > > > Setter/getter typical to properties/headers api’s traditionally are map > > > styled interfaces and what I believe is most expected styled thus the > > Key, > > > Value setter. > > > Also it would mean rather than an interface, we would be making our > > > internal header impl object we have for the array, exposed. E.g. if we > > had > > > a Map really this would be Map.Entry interface, this is the same > reasons > > on > > > the map interface I cannot actually make the underlying Node object > > that’s > > > the implementation for HashMap, so that internals can be changed. > > > > > > > > > On point 3, > > > > > > I think it people do expect it to be performant, thus why originall
Re: [DISCUSS] KIP-82 - Add Record Headers
I thought the argument was against creating the extra objects unnecessarily (i.e. if they were not accessed). And note that making the Headers immutable doesn't necessarily mean that they need to be copied: you can do a trick like Guava's Iterables.concat to add additional headers without changing the underlying collections. -Jason On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce wrote: > If the argument for not having a map holding the key, value pairs is due > to garbage creation of HashMap entry's, forcing the creation of a whole new > producer record to simply add a head, surely is creating a-lot more? > > From: Jason Gustafson > Sent: Wednesday, February 22, 2017 10:09 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > The current producer interceptor API is this: > > ProducerRecord onSend(ProducerRecord record); > > So adding a header means creating a new ProducerRecord with a new header > added to the current headers and returning it. Would that not work? > > -Jason > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce > wrote: > > > So how would you have this work if not mutable where interceptors would > > add headers? > > > > Sent using OWA for iPhone > > > > From: Jason Gustafson > > Sent: Wednesday, February 22, 2017 8:42:27 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > I think the point on the mutability of Headers is worth discussing a > little > > more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) > is > > constructed, there should be no need to further change the headers. Is > that > > correct? If so, then why not enforce that that is the case through the > API? > > One problem with mutability it that it constrains the implementation of > > Headers. For example, if we were backing with a byte slice, would we > recopy > > the bytes if a header is added or would we maintain a satellite > collection > > of added records. Seems not great either way. If we really think > mutability > > is needed, perhaps we could add a method to headers to convert it to a > > mutable type (e.g. a Headers.toMap)? > > > > I'm also with Ismael about exposing Headers.get(). I thought it might > make > > sense to have a method like this instead: > > > > Iterable findMatching(Pattern pattern); > > > > This makes the (potential) need to scan the headers clear in the API. I'd > > also be fine exposing no getter at all. In general, Ï think it's good to > > start with a minimalistic API and work from there since it's always > easier > > to add methods than remove them. > > > > -Jason > > > > On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce > > wrote: > > > > > > > > Hi Ismael > > > > > > On point 1, > > > > > > Sure makes sense will update shortly. > > > > > > On point 2, > > > > > > Setter/getter typical to properties/headers api’s traditionally are map > > > styled interfaces and what I believe is most expected styled thus the > > Key, > > > Value setter. > > > Also it would mean rather than an interface, we would be making our > > > internal header impl object we have for the array, exposed. E.g. if we > > had > > > a Map really this would be Map.Entry interface, this is the same > reasons > > on > > > the map interface I cannot actually make the underlying Node object > > that’s > > > the implementation for HashMap, so that internals can be changed. > > > > > > > > > On point 3, > > > > > > I think it people do expect it to be performant, thus why originally > > > concern I raised with using an array for to me is an early memory > > > optimisation. I think the user experience of properties/headers is on a > > > get/set model. This is why its important we have encapsulated logic > that > > > then allows us to change this to a map, if this becomes and issue, and > > the > > > memory overhead of hashmap is less so. > > > > > > > > > > > > > > > On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" < > > > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > > > > > Hi all, > > > > > > Great to see the progress that has been achieved on this one. :) A > > few > > > comme
Re: [DISCUSS] KIP-82 - Add Record Headers
If the argument for not having a map holding the key, value pairs is due to garbage creation of HashMap entry's, forcing the creation of a whole new producer record to simply add a head, surely is creating a-lot more? From: Jason Gustafson Sent: Wednesday, February 22, 2017 10:09 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers The current producer interceptor API is this: ProducerRecord onSend(ProducerRecord record); So adding a header means creating a new ProducerRecord with a new header added to the current headers and returning it. Would that not work? -Jason On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce wrote: > So how would you have this work if not mutable where interceptors would > add headers? > > Sent using OWA for iPhone > > From: Jason Gustafson > Sent: Wednesday, February 22, 2017 8:42:27 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > I think the point on the mutability of Headers is worth discussing a little > more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) is > constructed, there should be no need to further change the headers. Is that > correct? If so, then why not enforce that that is the case through the API? > One problem with mutability it that it constrains the implementation of > Headers. For example, if we were backing with a byte slice, would we recopy > the bytes if a header is added or would we maintain a satellite collection > of added records. Seems not great either way. If we really think mutability > is needed, perhaps we could add a method to headers to convert it to a > mutable type (e.g. a Headers.toMap)? > > I'm also with Ismael about exposing Headers.get(). I thought it might make > sense to have a method like this instead: > > Iterable findMatching(Pattern pattern); > > This makes the (potential) need to scan the headers clear in the API. I'd > also be fine exposing no getter at all. In general, Ï think it's good to > start with a minimalistic API and work from there since it's always easier > to add methods than remove them. > > -Jason > > On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce > wrote: > > > > > Hi Ismael > > > > On point 1, > > > > Sure makes sense will update shortly. > > > > On point 2, > > > > Setter/getter typical to properties/headers api’s traditionally are map > > styled interfaces and what I believe is most expected styled thus the > Key, > > Value setter. > > Also it would mean rather than an interface, we would be making our > > internal header impl object we have for the array, exposed. E.g. if we > had > > a Map really this would be Map.Entry interface, this is the same reasons > on > > the map interface I cannot actually make the underlying Node object > that’s > > the implementation for HashMap, so that internals can be changed. > > > > > > On point 3, > > > > I think it people do expect it to be performant, thus why originally > > concern I raised with using an array for to me is an early memory > > optimisation. I think the user experience of properties/headers is on a > > get/set model. This is why its important we have encapsulated logic that > > then allows us to change this to a map, if this becomes and issue, and > the > > memory overhead of hashmap is less so. > > > > > > > > > > On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" < > > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > > > Hi all, > > > > Great to see the progress that has been achieved on this one. :) A > few > > comments regarding the APIs (I'm still reviewing the message format > > changes): > > > > 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should > be > > named `headers` (we avoid the `get` prefix in Kafka) > > > > 2. The `Headers` class is mutable (there's an `add` method). Does it > > need > > to be? If so, it would be good to explain why. Related to that, we > > should > > also explain the thinking around thread-safety. If we keep the `add` > > method, it may make sense for it to take a `Header` (that way we can > > add > > things to `Header` without changing the interface). > > > > 3. Do we need the `Headers.get()` method? People usually assume that > > `get` > > would be efficient, but depending on the implementation (the current > > proposal states that an array
Re: [DISCUSS] KIP-82 - Add Record Headers
The current producer interceptor API is this: ProducerRecord onSend(ProducerRecord record); So adding a header means creating a new ProducerRecord with a new header added to the current headers and returning it. Would that not work? -Jason On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce wrote: > So how would you have this work if not mutable where interceptors would > add headers? > > Sent using OWA for iPhone > > From: Jason Gustafson > Sent: Wednesday, February 22, 2017 8:42:27 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > I think the point on the mutability of Headers is worth discussing a little > more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) is > constructed, there should be no need to further change the headers. Is that > correct? If so, then why not enforce that that is the case through the API? > One problem with mutability it that it constrains the implementation of > Headers. For example, if we were backing with a byte slice, would we recopy > the bytes if a header is added or would we maintain a satellite collection > of added records. Seems not great either way. If we really think mutability > is needed, perhaps we could add a method to headers to convert it to a > mutable type (e.g. a Headers.toMap)? > > I'm also with Ismael about exposing Headers.get(). I thought it might make > sense to have a method like this instead: > > Iterable findMatching(Pattern pattern); > > This makes the (potential) need to scan the headers clear in the API. I'd > also be fine exposing no getter at all. In general, Ï think it's good to > start with a minimalistic API and work from there since it's always easier > to add methods than remove them. > > -Jason > > On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce > wrote: > > > > > Hi Ismael > > > > On point 1, > > > > Sure makes sense will update shortly. > > > > On point 2, > > > > Setter/getter typical to properties/headers api’s traditionally are map > > styled interfaces and what I believe is most expected styled thus the > Key, > > Value setter. > > Also it would mean rather than an interface, we would be making our > > internal header impl object we have for the array, exposed. E.g. if we > had > > a Map really this would be Map.Entry interface, this is the same reasons > on > > the map interface I cannot actually make the underlying Node object > that’s > > the implementation for HashMap, so that internals can be changed. > > > > > > On point 3, > > > > I think it people do expect it to be performant, thus why originally > > concern I raised with using an array for to me is an early memory > > optimisation. I think the user experience of properties/headers is on a > > get/set model. This is why its important we have encapsulated logic that > > then allows us to change this to a map, if this becomes and issue, and > the > > memory overhead of hashmap is less so. > > > > > > > > > > On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" < > > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > > > Hi all, > > > > Great to see the progress that has been achieved on this one. :) A > few > > comments regarding the APIs (I'm still reviewing the message format > > changes): > > > > 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should > be > > named `headers` (we avoid the `get` prefix in Kafka) > > > > 2. The `Headers` class is mutable (there's an `add` method). Does it > > need > > to be? If so, it would be good to explain why. Related to that, we > > should > > also explain the thinking around thread-safety. If we keep the `add` > > method, it may make sense for it to take a `Header` (that way we can > > add > > things to `Header` without changing the interface). > > > > 3. Do we need the `Headers.get()` method? People usually assume that > > `get` > > would be efficient, but depending on the implementation (the current > > proposal states that an array would be used), it may not be. If we > > expect > > the number of headers to be small, it doesn't matter though. > > > > Ismael > > > > On Tue, Feb 21, 2017 at 6:38 PM, Michael Pearce < > michael.pea...@ig.com > > > > > wrote: > > > > > Hi Jason, > > > > > > Have converted the interface/
Re: [DISCUSS] KIP-82 - Add Record Headers
So how would you have this work if not mutable where interceptors would add headers? Sent using OWA for iPhone From: Jason Gustafson Sent: Wednesday, February 22, 2017 8:42:27 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers I think the point on the mutability of Headers is worth discussing a little more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) is constructed, there should be no need to further change the headers. Is that correct? If so, then why not enforce that that is the case through the API? One problem with mutability it that it constrains the implementation of Headers. For example, if we were backing with a byte slice, would we recopy the bytes if a header is added or would we maintain a satellite collection of added records. Seems not great either way. If we really think mutability is needed, perhaps we could add a method to headers to convert it to a mutable type (e.g. a Headers.toMap)? I'm also with Ismael about exposing Headers.get(). I thought it might make sense to have a method like this instead: Iterable findMatching(Pattern pattern); This makes the (potential) need to scan the headers clear in the API. I'd also be fine exposing no getter at all. In general, Ï think it's good to start with a minimalistic API and work from there since it's always easier to add methods than remove them. -Jason On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce wrote: > > Hi Ismael > > On point 1, > > Sure makes sense will update shortly. > > On point 2, > > Setter/getter typical to properties/headers api’s traditionally are map > styled interfaces and what I believe is most expected styled thus the Key, > Value setter. > Also it would mean rather than an interface, we would be making our > internal header impl object we have for the array, exposed. E.g. if we had > a Map really this would be Map.Entry interface, this is the same reasons on > the map interface I cannot actually make the underlying Node object that’s > the implementation for HashMap, so that internals can be changed. > > > On point 3, > > I think it people do expect it to be performant, thus why originally > concern I raised with using an array for to me is an early memory > optimisation. I think the user experience of properties/headers is on a > get/set model. This is why its important we have encapsulated logic that > then allows us to change this to a map, if this becomes and issue, and the > memory overhead of hashmap is less so. > > > > > On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" < > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > Hi all, > > Great to see the progress that has been achieved on this one. :) A few > comments regarding the APIs (I'm still reviewing the message format > changes): > > 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should be > named `headers` (we avoid the `get` prefix in Kafka) > > 2. The `Headers` class is mutable (there's an `add` method). Does it > need > to be? If so, it would be good to explain why. Related to that, we > should > also explain the thinking around thread-safety. If we keep the `add` > method, it may make sense for it to take a `Header` (that way we can > add > things to `Header` without changing the interface). > > 3. Do we need the `Headers.get()` method? People usually assume that > `get` > would be efficient, but depending on the implementation (the current > proposal states that an array would be used), it may not be. If we > expect > the number of headers to be small, it doesn't matter though. > > Ismael > > On Tue, Feb 21, 2017 at 6:38 PM, Michael Pearce > > wrote: > > > Hi Jason, > > > > Have converted the interface/api bullets into interface code > snippets. > > > > Agreed implementation won’t take too long. We have early versions > already. > > Maybe a week before you think about merging I would assume it would > be more > > stabilised? I was thinking then we could fork from your confluent > branch, > > making and then holding KIP-82 changes in a patch file, that we can > then > > re-fork from apache once KIP98 final is merged, and apply patch with > last > > minute changes. > > > > Cheers > > Mike > > > > > > On 22/02/2017, 00:56, "Jason Gustafson" wrote: > > > > Hey Michael, > > > > Awesome. I have a minor request. The APIs are currently > documented as a > &g
Re: [DISCUSS] KIP-82 - Add Record Headers
I think the point on the mutability of Headers is worth discussing a little more. As far as I can tell, once the ProducerRecord (or ConsumerRecord) is constructed, there should be no need to further change the headers. Is that correct? If so, then why not enforce that that is the case through the API? One problem with mutability it that it constrains the implementation of Headers. For example, if we were backing with a byte slice, would we recopy the bytes if a header is added or would we maintain a satellite collection of added records. Seems not great either way. If we really think mutability is needed, perhaps we could add a method to headers to convert it to a mutable type (e.g. a Headers.toMap)? I'm also with Ismael about exposing Headers.get(). I thought it might make sense to have a method like this instead: Iterable findMatching(Pattern pattern); This makes the (potential) need to scan the headers clear in the API. I'd also be fine exposing no getter at all. In general, Ï think it's good to start with a minimalistic API and work from there since it's always easier to add methods than remove them. -Jason On Wed, Feb 22, 2017 at 9:16 AM, Michael Pearce wrote: > > Hi Ismael > > On point 1, > > Sure makes sense will update shortly. > > On point 2, > > Setter/getter typical to properties/headers api’s traditionally are map > styled interfaces and what I believe is most expected styled thus the Key, > Value setter. > Also it would mean rather than an interface, we would be making our > internal header impl object we have for the array, exposed. E.g. if we had > a Map really this would be Map.Entry interface, this is the same reasons on > the map interface I cannot actually make the underlying Node object that’s > the implementation for HashMap, so that internals can be changed. > > > On point 3, > > I think it people do expect it to be performant, thus why originally > concern I raised with using an array for to me is an early memory > optimisation. I think the user experience of properties/headers is on a > get/set model. This is why its important we have encapsulated logic that > then allows us to change this to a map, if this becomes and issue, and the > memory overhead of hashmap is less so. > > > > > On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" < > isma...@gmail.com on behalf of ism...@juma.me.uk> wrote: > > Hi all, > > Great to see the progress that has been achieved on this one. :) A few > comments regarding the APIs (I'm still reviewing the message format > changes): > > 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should be > named `headers` (we avoid the `get` prefix in Kafka) > > 2. The `Headers` class is mutable (there's an `add` method). Does it > need > to be? If so, it would be good to explain why. Related to that, we > should > also explain the thinking around thread-safety. If we keep the `add` > method, it may make sense for it to take a `Header` (that way we can > add > things to `Header` without changing the interface). > > 3. Do we need the `Headers.get()` method? People usually assume that > `get` > would be efficient, but depending on the implementation (the current > proposal states that an array would be used), it may not be. If we > expect > the number of headers to be small, it doesn't matter though. > > Ismael > > On Tue, Feb 21, 2017 at 6:38 PM, Michael Pearce > > wrote: > > > Hi Jason, > > > > Have converted the interface/api bullets into interface code > snippets. > > > > Agreed implementation won’t take too long. We have early versions > already. > > Maybe a week before you think about merging I would assume it would > be more > > stabilised? I was thinking then we could fork from your confluent > branch, > > making and then holding KIP-82 changes in a patch file, that we can > then > > re-fork from apache once KIP98 final is merged, and apply patch with > last > > minute changes. > > > > Cheers > > Mike > > > > > > On 22/02/2017, 00:56, "Jason Gustafson" wrote: > > > > Hey Michael, > > > > Awesome. I have a minor request. The APIs are currently > documented as a > > wiki list. Would you mind adding a code snippet instead? It's a > bit > > easier > > to process. > > > > How will be best to manage this, as we will obviously build off > your > > KIP’s > > > protocol changes, to avoid a merge hell, should we branch from > your > > branch > > > in the confluent repo or is it worth having a KIP-98 special > branch > > in the > > > apache git, that we can branch/fork from? > > > > > > I was thinking about this also. Ideally we'd like to get the > changes > > in as > > close together as possible since we only want one magic bump and > some > > users > > deploy trunk. Th
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Ismael On point 1, Sure makes sense will update shortly. On point 2, Setter/getter typical to properties/headers api’s traditionally are map styled interfaces and what I believe is most expected styled thus the Key, Value setter. Also it would mean rather than an interface, we would be making our internal header impl object we have for the array, exposed. E.g. if we had a Map really this would be Map.Entry interface, this is the same reasons on the map interface I cannot actually make the underlying Node object that’s the implementation for HashMap, so that internals can be changed. On point 3, I think it people do expect it to be performant, thus why originally concern I raised with using an array for to me is an early memory optimisation. I think the user experience of properties/headers is on a get/set model. This is why its important we have encapsulated logic that then allows us to change this to a map, if this becomes and issue, and the memory overhead of hashmap is less so. On 22/02/2017, 15:56, "isma...@gmail.com on behalf of Ismael Juma" wrote: Hi all, Great to see the progress that has been achieved on this one. :) A few comments regarding the APIs (I'm still reviewing the message format changes): 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should be named `headers` (we avoid the `get` prefix in Kafka) 2. The `Headers` class is mutable (there's an `add` method). Does it need to be? If so, it would be good to explain why. Related to that, we should also explain the thinking around thread-safety. If we keep the `add` method, it may make sense for it to take a `Header` (that way we can add things to `Header` without changing the interface). 3. Do we need the `Headers.get()` method? People usually assume that `get` would be efficient, but depending on the implementation (the current proposal states that an array would be used), it may not be. If we expect the number of headers to be small, it doesn't matter though. Ismael On Tue, Feb 21, 2017 at 6:38 PM, Michael Pearce wrote: > Hi Jason, > > Have converted the interface/api bullets into interface code snippets. > > Agreed implementation won’t take too long. We have early versions already. > Maybe a week before you think about merging I would assume it would be more > stabilised? I was thinking then we could fork from your confluent branch, > making and then holding KIP-82 changes in a patch file, that we can then > re-fork from apache once KIP98 final is merged, and apply patch with last > minute changes. > > Cheers > Mike > > > On 22/02/2017, 00:56, "Jason Gustafson" wrote: > > Hey Michael, > > Awesome. I have a minor request. The APIs are currently documented as a > wiki list. Would you mind adding a code snippet instead? It's a bit > easier > to process. > > How will be best to manage this, as we will obviously build off your > KIP’s > > protocol changes, to avoid a merge hell, should we branch from your > branch > > in the confluent repo or is it worth having a KIP-98 special branch > in the > > apache git, that we can branch/fork from? > > > I was thinking about this also. Ideally we'd like to get the changes > in as > close together as possible since we only want one magic bump and some > users > deploy trunk. The level of effort to change the format for headers > seems > not too high. Do you agree? My guess is that the KIP-98 message format > patch will take 2-3 weeks to review before we merge to trunk, so you > could > hold off implementing until that patch has somewhat stabilized. That > would > save some potential rebase pain. > > -Jason > > > The information contained in this email is strictly confidential and for > the use of the addressee only, unless otherwise indicated. If you are not > the intended recipient, please do not read, copy, use or disclose to others > this message or any attachment. Please also notify the sender by replying > to this email or by telephone (+44(020 7896 0011) and then delete the > email and any copies of it. Opinions, conclusion (etc) that do not relate > to the official business of this company shall be understood as neither > given nor endorsed by it. IG is a trading name of IG Markets Limited (a > company registered in England and Wales, company number 04008957) and IG > Index Limited (a company registered in England and Wales, company number > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill, > London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG > Index Limited (register number 114059) are authorised and regulated by the > Fi
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi all, Great to see the progress that has been achieved on this one. :) A few comments regarding the APIs (I'm still reviewing the message format changes): 1. Nit: `getHeaders` in `ProducerRecord` and `ConsumerRecord` should be named `headers` (we avoid the `get` prefix in Kafka) 2. The `Headers` class is mutable (there's an `add` method). Does it need to be? If so, it would be good to explain why. Related to that, we should also explain the thinking around thread-safety. If we keep the `add` method, it may make sense for it to take a `Header` (that way we can add things to `Header` without changing the interface). 3. Do we need the `Headers.get()` method? People usually assume that `get` would be efficient, but depending on the implementation (the current proposal states that an array would be used), it may not be. If we expect the number of headers to be small, it doesn't matter though. Ismael On Tue, Feb 21, 2017 at 6:38 PM, Michael Pearce wrote: > Hi Jason, > > Have converted the interface/api bullets into interface code snippets. > > Agreed implementation won’t take too long. We have early versions already. > Maybe a week before you think about merging I would assume it would be more > stabilised? I was thinking then we could fork from your confluent branch, > making and then holding KIP-82 changes in a patch file, that we can then > re-fork from apache once KIP98 final is merged, and apply patch with last > minute changes. > > Cheers > Mike > > > On 22/02/2017, 00:56, "Jason Gustafson" wrote: > > Hey Michael, > > Awesome. I have a minor request. The APIs are currently documented as a > wiki list. Would you mind adding a code snippet instead? It's a bit > easier > to process. > > How will be best to manage this, as we will obviously build off your > KIP’s > > protocol changes, to avoid a merge hell, should we branch from your > branch > > in the confluent repo or is it worth having a KIP-98 special branch > in the > > apache git, that we can branch/fork from? > > > I was thinking about this also. Ideally we'd like to get the changes > in as > close together as possible since we only want one magic bump and some > users > deploy trunk. The level of effort to change the format for headers > seems > not too high. Do you agree? My guess is that the KIP-98 message format > patch will take 2-3 weeks to review before we merge to trunk, so you > could > hold off implementing until that patch has somewhat stabilized. That > would > save some potential rebase pain. > > -Jason > > > The information contained in this email is strictly confidential and for > the use of the addressee only, unless otherwise indicated. If you are not > the intended recipient, please do not read, copy, use or disclose to others > this message or any attachment. Please also notify the sender by replying > to this email or by telephone (+44(020 7896 0011) and then delete the > email and any copies of it. Opinions, conclusion (etc) that do not relate > to the official business of this company shall be understood as neither > given nor endorsed by it. IG is a trading name of IG Markets Limited (a > company registered in England and Wales, company number 04008957) and IG > Index Limited (a company registered in England and Wales, company number > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill, > London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG > Index Limited (register number 114059) are authorised and regulated by the > Financial Conduct Authority. >
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Jason, Have converted the interface/api bullets into interface code snippets. Agreed implementation won’t take too long. We have early versions already. Maybe a week before you think about merging I would assume it would be more stabilised? I was thinking then we could fork from your confluent branch, making and then holding KIP-82 changes in a patch file, that we can then re-fork from apache once KIP98 final is merged, and apply patch with last minute changes. Cheers Mike On 22/02/2017, 00:56, "Jason Gustafson" wrote: Hey Michael, Awesome. I have a minor request. The APIs are currently documented as a wiki list. Would you mind adding a code snippet instead? It's a bit easier to process. How will be best to manage this, as we will obviously build off your KIP’s > protocol changes, to avoid a merge hell, should we branch from your branch > in the confluent repo or is it worth having a KIP-98 special branch in the > apache git, that we can branch/fork from? I was thinking about this also. Ideally we'd like to get the changes in as close together as possible since we only want one magic bump and some users deploy trunk. The level of effort to change the format for headers seems not too high. Do you agree? My guess is that the KIP-98 message format patch will take 2-3 weeks to review before we merge to trunk, so you could hold off implementing until that patch has somewhat stabilized. That would save some potential rebase pain. -Jason The information contained in this email is strictly confidential and for the use of the addressee only, unless otherwise indicated. If you are not the intended recipient, please do not read, copy, use or disclose to others this message or any attachment. Please also notify the sender by replying to this email or by telephone (+44(020 7896 0011) and then delete the email and any copies of it. Opinions, conclusion (etc) that do not relate to the official business of this company shall be understood as neither given nor endorsed by it. IG is a trading name of IG Markets Limited (a company registered in England and Wales, company number 04008957) and IG Index Limited (a company registered in England and Wales, company number 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG Index Limited (register number 114059) are authorised and regulated by the Financial Conduct Authority.
Re: [DISCUSS] KIP-82 - Add Record Headers
Hey Michael, Awesome. I have a minor request. The APIs are currently documented as a wiki list. Would you mind adding a code snippet instead? It's a bit easier to process. How will be best to manage this, as we will obviously build off your KIP’s > protocol changes, to avoid a merge hell, should we branch from your branch > in the confluent repo or is it worth having a KIP-98 special branch in the > apache git, that we can branch/fork from? I was thinking about this also. Ideally we'd like to get the changes in as close together as possible since we only want one magic bump and some users deploy trunk. The level of effort to change the format for headers seems not too high. Do you agree? My guess is that the KIP-98 message format patch will take 2-3 weeks to review before we merge to trunk, so you could hold off implementing until that patch has somewhat stabilized. That would save some potential rebase pain. -Jason
Re: [DISCUSS] KIP-82 - Add Record Headers
; > slice of bytes, > > > > which > > > > > then gets parsed later if needed, or can be parsed > > right away, the > > > > headers > > > > > is part of the protocol, so can still be validated > > if wanted. > > > > > > > > > > If you had a header count then you would have to go > > through each > > > > header > > > > > key and value length value to work out how much to > > skip to get to > > > > say the > > > > > value or any future component in the message after > > the headers. > > > > Having it > > > > > as a byte[] with length value makes this a lot > > easier to skip. > > > > > > > > > > > > > > > On 17/02/2017, 20:37, "Michael Pearce" < > > michael.pea...@ig.com> > > > > wrote: > > > > > > > > > > What’s the issue with exposing a method > > getHeaders on the > > > > > producer/consumer record? It doesn’t break > anything. > > We don’t need > > > > any > > > > > special version. > > > > > > > > > > Current batch consumer model and consumer > > interceptors don’t > > > work > > > > > where headers need to be acted on at per message > > level at time of > > > > > processing, very case is APM (the core one), where > > the header value > > > > is used > > > > > to continue tracing. JMS/HTTP etc all expose these, > > without issues. > > > > I would > > > > > NOT want to lock this down to only be usable > > accessible via > > > > interceptors, > > > > > as we’d fail on one of the main goals. > > > > > > > > > > Regards > > > > > Mike > > > > > > > > > > > > > > > > > > > > > > > > > On 17/02/2017, 20:21, "Jason Gustafson" < > > ja...@confluent.io> > > > > wrote: > > > > > > > > > > The point about creation of maps seems > > orthogonal. We can > > > > still > > > > > represent > > > > > the headers as a slice of bytes until the > > time it is > > > > accessed. > > > > > > > > > > > > > > > > Yes exactly we have access to the records > > thus why the > > > > header > > > > > should be > > > > > > accessible via it and not hidden for only > > interceptors to > > > > access. > > > > > > > > > > > > > > > As explained above, the point is to make > the > > intended usage > > > > clear. > > > > > Applications should continue to rely on the > > key/value > > > fields > > > > to > > > > > serialize > > > > > their own headers, and it would be more > > ideal if we can > > > avoid > > > > > leaking > > > > > third-party headers into applications. This > > is difficult to > > > > do > > > > > with the > > > > > current interceptors because they share the > > record objects > > > > with > > > > > the common > > > > > API. What I had in mind is something like > an > > extension of > >
Re: [DISCUSS] KIP-82 - Add Record Headers
< > ja...@confluent.io> > > > wrote: > > > > > > > > The point about creation of maps seems > orthogonal. We can > > > still > > > > represent > > > > the headers as a slice of bytes until the > time it is > > > accessed. > > > > > > > > > > > > > Yes exactly we have access to the records > thus why the > > > header > > > > should be > > > > > accessible via it and not hidden for only > interceptors to > > > access. > > > > > > > > > > > > As explained above, the point is to make the > intended usage > > > clear. > > > > Applications should continue to rely on the > key/value > > fields > > > to > > > > serialize > > > > their own headers, and it would be more > ideal if we can > > avoid > > > > leaking > > > > third-party headers into applications. This > is difficult to > > > do > > > > with the > > > > current interceptors because they share the > record objects > > > with > > > > the common > > > > API. What I had in mind is something like an > extension of > > the > > > > current > > > > interceptors which exposed a different > object (e.g. > > > > `RecordAndHeaders`). > > > > The challenge is for MM-like use cases. Let > me see if I can > > > come > > > > up with a > > > > concrete proposal for that problem. > > > > > > > > -Jason > > > > > > > > > > > > > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael > Pearce < > > > > michael.pea...@ig.com> > > > > wrote: > > > > > > > > > I am happy to move the definition of the > header into the > > > message > > > > body, but > > > > > would cause us not to lazy > initialise/parse the headers, > > as > > > > obviously, we > > > > > would have to traverse these reading the > message. > > > > > > > > > > This was actually one of Jay’s requests: > > > > > > > > > > “2. I think we should think about > creating the Map > > > lazily to > > > > avoid > > > > > parsing out all the headers into > little objects. > > > HashMaps > > > > themselves > > > > > are > > > > > kind of expensive and the consumer is > very perf > > > sensitive so > > > > and making > > > > > gazillions of hashmaps that may or may > not get used > > is > > > > probably a bad > > > > > idea.” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" < > > > michael.pea...@ig.com> > > > > wrote: > > > > > > > > > > Yes exactly we have access to the > records thus why >
Re: [DISCUSS] KIP-82 - Add Record Headers
any future component in the message after the headers. > > Having it > > > as a byte[] with length value makes this a lot easier to skip. > > > > > > > > > On 17/02/2017, 20:37, "Michael Pearce" > > wrote: > > > > > > What’s the issue with exposing a method getHeaders on the > > > producer/consumer record? It doesn’t break anything. We don’t need > > any > > > special version. > > > > > > Current batch consumer model and consumer interceptors don’t > work > > > where headers need to be acted on at per message level at time of > > > processing, very case is APM (the core one), where the header value > > is used > > > to continue tracing. JMS/HTTP etc all expose these, without issues. > > I would > > > NOT want to lock this down to only be usable accessible via > > interceptors, > > > as we’d fail on one of the main goals. > > > > > > Regards > > > Mike > > > > > > > > > > > > > > > On 17/02/2017, 20:21, "Jason Gustafson" > > wrote: > > > > > > The point about creation of maps seems orthogonal. We can > > still > > > represent > > > the headers as a slice of bytes until the time it is > > accessed. > > > > > > > > > > Yes exactly we have access to the records thus why the > > header > > > should be > > > > accessible via it and not hidden for only interceptors to > > access. > > > > > > > > > As explained above, the point is to make the intended usage > > clear. > > > Applications should continue to rely on the key/value > fields > > to > > > serialize > > > their own headers, and it would be more ideal if we can > avoid > > > leaking > > > third-party headers into applications. This is difficult to > > do > > > with the > > > current interceptors because they share the record objects > > with > > > the common > > > API. What I had in mind is something like an extension of > the > > > current > > > interceptors which exposed a different object (e.g. > > > `RecordAndHeaders`). > > > The challenge is for MM-like use cases. Let me see if I can > > come > > > up with a > > > concrete proposal for that problem. > > > > > > -Jason > > > > > > > > > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce < > > > michael.pea...@ig.com> > > > wrote: > > > > > > > I am happy to move the definition of the header into the > > message > > > body, but > > > > would cause us not to lazy initialise/parse the headers, > as > > > obviously, we > > > > would have to traverse these reading the message. > > > > > > > > This was actually one of Jay’s requests: > > > > > > > > “2. I think we should think about creating the Map > > lazily to > > > avoid > > > > parsing out all the headers into little objects. > > HashMaps > > > themselves > > > > are > > > > kind of expensive and the consumer is very perf > > sensitive so > > > and making > >
Re: [DISCUSS] KIP-82 - Add Record Headers
ceptors, > > > as we’d fail on one of the main goals. > > > > > > Regards > > > Mike > > > > > > > > > > > > > > > On 17/02/2017, 20:21, "Jason Gustafson" > > wrote: > > > > > > The point about creation of maps seems orthogonal. We can > > still > > > represent > > > the headers as a slice of bytes until the time it is > > accessed. > > > > > > > > > > Yes exactly we have access to the records thus why the > > header > > > should be > > > > accessible via it and not hidden for only interceptors to > > access. > > > > > > > > > As explained above, the point is to make the intended usage > > clear. > > > Applications should continue to rely on the key/value > fields > > to > > > serialize > > > their own headers, and it would be more ideal if we can > avoid > > > leaking > > > third-party headers into applications. This is difficult to > > do > > > with the > > > current interceptors because they share the record objects > > with > > > the common > > > API. What I had in mind is something like an extension of > the > > > current > > > interceptors which exposed a different object (e.g. > > > `RecordAndHeaders`). > > > The challenge is for MM-like use cases. Let me see if I can > > come > > > up with a > > > concrete proposal for that problem. > > > > > > -Jason > > > > > > > > > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce < > > > michael.pea...@ig.com> > > > wrote: > > > > > > > I am happy to move the definition of the header into the > > message > > > body, but > > > > would cause us not to lazy initialise/parse the headers, > as > > > obviously, we > > > > would have to traverse these reading the message. > > > > > > > > This was actually one of Jay’s requests: > > > > > > > > “2. I think we should think about creating the Map > > lazily to > > > avoid > > > > parsing out all the headers into little objects. > > HashMaps > > > themselves > > > > are > > > > kind of expensive and the consumer is very perf > > sensitive so > > > and making > > > > gazillions of hashmaps that may or may not get used > is > > > probably a bad > > > > idea.” > > > > > > > > > > > > > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" < > > michael.pea...@ig.com> > > > wrote: > > > > > > > > Yes exactly we have access to the records thus why > the > > > header should > > > > be accessible via it and not hidden for only interceptors > > to > > > access. > > > > > > > > Sent using OWA for iPhone > > > > > > > > From: Magnus Edenhill > > > > Sent: Friday, February 17, 2017 7:34:49 PM > > > > To: dev@kafka.apache.org > > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > > > Big +1 on VarInts. > > > > CPUs are fast, memory is slow. > > > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
which exposed a different object (e.g. > > > `RecordAndHeaders`). > > > The challenge is for MM-like use cases. Let me see if I can > > come > > > up with a > > > concrete proposal for that problem. > > > > > > -Jason > > > > > > > > > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce < > > > michael.pea...@ig.com> > > > wrote: > > > > > > > I am happy to move the definition of the header into the > > message > > > body, but > > > > would cause us not to lazy initialise/parse the headers, > as > > > obviously, we > > > > would have to traverse these reading the message. > > > > > > > > This was actually one of Jay’s requests: > > > > > > > > “2. I think we should think about creating the Map > > lazily to > > > avoid > > > > parsing out all the headers into little objects. > > HashMaps > > > themselves > > > > are > > > > kind of expensive and the consumer is very perf > > sensitive so > > > and making > > > > gazillions of hashmaps that may or may not get used > is > > > probably a bad > > > > idea.” > > > > > > > > > > > > > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" < > > michael.pea...@ig.com> > > > wrote: > > > > > > > > Yes exactly we have access to the records thus why > the > > > header should > > > > be accessible via it and not hidden for only interceptors > > to > > > access. > > > > > > > > Sent using OWA for iPhone > > > > > > > > From: Magnus Edenhill > > > > Sent: Friday, February 17, 2017 7:34:49 PM > > > > To: dev@kafka.apache.org > > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > > > Big +1 on VarInts. > > > > CPUs are fast, memory is slow. > > > > > > > > I agree with Jason that we'll want to continue > > verifying > > > messages, > > > > including their headers, so while I appreciate the > > idea of > > > the opaque > > > > header blob it won't be useful in practice. > > > > > > > > /Magnus > > > > > > > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson < > > > ja...@confluent.io>: > > > > > > > > > Sorry, my mistake. The consumer interceptor is per > > batch, > > > though I'm > > > > not > > > > > sure that's an actual limitation since you still > have > > > access to the > > > > > individual records. > > > > > > > > > > -Jason > > > > > > > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > > > > ja...@confluent.io> > > > > > wrote: > > > > > > > > > > > Re headers as byte array and future use by > broker. > > This > > > doesn't > > > > take away > > > > > >> from that at all. Nor makes it difficult at all > > in my > > > opinion. > > > > > > > > > > > > > > > > > > Yeah, I didn't say it was difficult, only > awkward. > > You > > > wouldn't > > > > write the > > > > > > schema that way if you were planning to use it on > > the > > > brokers from > > > > the >
Re: [DISCUSS] KIP-82 - Add Record Headers
o lock this down to only be usable accessible via > interceptors, > > as we’d fail on one of the main goals. > > > > Regards > > Mike > > > > > > > > > > On 17/02/2017, 20:21, "Jason Gustafson" > wrote: > > > > The point about creation of maps seems orthogonal. We can > still > > represent > > the headers as a slice of bytes until the time it is > accessed. > > > > > > > Yes exactly we have access to the records thus why the > header > > should be > > > accessible via it and not hidden for only interceptors to > access. > > > > > > As explained above, the point is to make the intended usage > clear. > > Applications should continue to rely on the key/value fields > to > > serialize > > their own headers, and it would be more ideal if we can avoid > > leaking > > third-party headers into applications. This is difficult to > do > > with the > > current interceptors because they share the record objects > with > > the common > > API. What I had in mind is something like an extension of the > > current > > interceptors which exposed a different object (e.g. > > `RecordAndHeaders`). > > The challenge is for MM-like use cases. Let me see if I can > come > > up with a > > concrete proposal for that problem. > > > > -Jason > > > > > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce < > > michael.pea...@ig.com> > > wrote: > > > > > I am happy to move the definition of the header into the > message > > body, but > > > would cause us not to lazy initialise/parse the headers, as > > obviously, we > > > would have to traverse these reading the message. > > > > > > This was actually one of Jay’s requests: > > > > > > “2. I think we should think about creating the Map > lazily to > > avoid > > > parsing out all the headers into little objects. > HashMaps > > themselves > > > are > > > kind of expensive and the consumer is very perf > sensitive so > > and making > > > gazillions of hashmaps that may or may not get used is > > probably a bad > > > idea.” > > > > > > > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" < > michael.pea...@ig.com> > > wrote: > > > > > > Yes exactly we have access to the records thus why the > > header should > > > be accessible via it and not hidden for only interceptors > to > > access. > > > > > > Sent using OWA for iPhone > > > > > > From: Magnus Edenhill > > > Sent: Friday, February 17, 2017 7:34:49 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > > Big +1 on VarInts. > > > CPUs are fast, memory is slow. > > > > > > I agree with Jason that we'll want to continue > verifying > > messages, > > > including their headers, so while I appreciate the > idea of > > the opaque > > > header blob it won't be useful in practice. > > > > > > /Magnus > > > > > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson < > > ja...@confluent.io>: > > > > > > > Sorry, my mistake. The consumer interceptor is per > batch, > > though I'm > > > not > > > > sure that's an actual limitation since you still have > > access to the > > > > individual records. > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
API. What I had in mind is something like an extension of the > current > interceptors which exposed a different object (e.g. > `RecordAndHeaders`). > The challenge is for MM-like use cases. Let me see if I can come > up with a > concrete proposal for that problem. > > -Jason > > > > On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce < > michael.pea...@ig.com> > wrote: > > > I am happy to move the definition of the header into the message > body, but > > would cause us not to lazy initialise/parse the headers, as > obviously, we > > would have to traverse these reading the message. > > > > This was actually one of Jay’s requests: > > > > “2. I think we should think about creating the Map lazily to > avoid > > parsing out all the headers into little objects. HashMaps > themselves > > are > > kind of expensive and the consumer is very perf sensitive so > and making > > gazillions of hashmaps that may or may not get used is > probably a bad > > idea.” > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" > wrote: > > > > Yes exactly we have access to the records thus why the > header should > > be accessible via it and not hidden for only interceptors to > access. > > > > Sent using OWA for iPhone > > > > From: Magnus Edenhill > > Sent: Friday, February 17, 2017 7:34:49 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > Big +1 on VarInts. > > CPUs are fast, memory is slow. > > > > I agree with Jason that we'll want to continue verifying > messages, > > including their headers, so while I appreciate the idea of > the opaque > > header blob it won't be useful in practice. > > > > /Magnus > > > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson < > ja...@confluent.io>: > > > > > Sorry, my mistake. The consumer interceptor is per batch, > though I'm > > not > > > sure that's an actual limitation since you still have > access to the > > > individual records. > > > > > > -Jason > > > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > > ja...@confluent.io> > > > wrote: > > > > > > > Re headers as byte array and future use by broker. This > doesn't > > take away > > > >> from that at all. Nor makes it difficult at all in my > opinion. > > > > > > > > > > > > Yeah, I didn't say it was difficult, only awkward. You > wouldn't > > write the > > > > schema that way if you were planning to use it on the > brokers from > > the > > > > beginning. Note also that one of the benefits of letting > the broker > > > > understand headers is that it can validate that they are > properly > > > > formatted. If cost is the only concern, we should > confirm its > > impact > > > > through performance testing. > > > > > > > > One of the key use cases requires access on consume at > per > > event/message > > > >> level at the point that message is being processed, as > such the > > batch > > > >> interceptors and batch consume api isn't suitable. It > needs to be > > at the > > > >> re
Re: [DISCUSS] KIP-82 - Add Record Headers
bout creating the Map lazily to > avoid > > parsing out all the headers into little objects. HashMaps > themselves > > are > > kind of expensive and the consumer is very perf sensitive so > and making > > gazillions of hashmaps that may or may not get used is > probably a bad > > idea.” > > > > > > > > > > > > On 17/02/2017, 19:44, "Michael Pearce" > wrote: > > > > Yes exactly we have access to the records thus why the > header should > > be accessible via it and not hidden for only interceptors to > access. > > > > Sent using OWA for iPhone > > > > From: Magnus Edenhill > > Sent: Friday, February 17, 2017 7:34:49 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > Big +1 on VarInts. > > CPUs are fast, memory is slow. > > > > I agree with Jason that we'll want to continue verifying > messages, > > including their headers, so while I appreciate the idea of > the opaque > > header blob it won't be useful in practice. > > > > /Magnus > > > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson < > ja...@confluent.io>: > > > > > Sorry, my mistake. The consumer interceptor is per batch, > though I'm > > not > > > sure that's an actual limitation since you still have > access to the > > > individual records. > > > > > > -Jason > > > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > > ja...@confluent.io> > > > wrote: > > > > > > > Re headers as byte array and future use by broker. This > doesn't > > take away > > > >> from that at all. Nor makes it difficult at all in my > opinion. > > > > > > > > > > > > Yeah, I didn't say it was difficult, only awkward. You > wouldn't > > write the > > > > schema that way if you were planning to use it on the > brokers from > > the > > > > beginning. Note also that one of the benefits of letting > the broker > > > > understand headers is that it can validate that they are > properly > > > > formatted. If cost is the only concern, we should > confirm its > > impact > > > > through performance testing. > > > > > > > > One of the key use cases requires access on consume at > per > > event/message > > > >> level at the point that message is being processed, as > such the > > batch > > > >> interceptors and batch consume api isn't suitable. It > needs to be > > at the > > > >> record level. > > > > > > > > > > > > I'm not sure I understand the point about batching. > Interceptors > > are > > > > applied per-message, right? > > > > > > > > My intent on interceptors is to keep the usage of headers > > well-defined so > > > > that they don't start leaking unnecessarily into > applications. My > > guess > > > is > > > > that it's probably inevitable, but isolating it in the > > interceptors would > > > > at least give people a second thought before deciding to > use it. > > The main > > > > challenge in my mind is figuring out how an MM use case > would > > work. It > > > > would be more cumbersome to replicate headers through an > > interceptor, > > > > though arguably MM should be working at a lower level > anyway. > > > > > > > > -Jason > > > > > > > > On Fri, Fe
Re: [DISCUSS] KIP-82 - Add Record Headers
Re: “The point about creation of maps seems orthogonal. We can still represent the headers as a slice of bytes until the time it is accessed.” That’s exactly what we’re doing the headers are a slice of bytes, which then gets parsed later if needed, or can be parsed right away, the headers is part of the protocol, so can still be validated if wanted. If you had a header count then you would have to go through each header key and value length value to work out how much to skip to get to say the value or any future component in the message after the headers. Having it as a byte[] with length value makes this a lot easier to skip. On 17/02/2017, 20:37, "Michael Pearce" wrote: What’s the issue with exposing a method getHeaders on the producer/consumer record? It doesn’t break anything. We don’t need any special version. Current batch consumer model and consumer interceptors don’t work where headers need to be acted on at per message level at time of processing, very case is APM (the core one), where the header value is used to continue tracing. JMS/HTTP etc all expose these, without issues. I would NOT want to lock this down to only be usable accessible via interceptors, as we’d fail on one of the main goals. Regards Mike On 17/02/2017, 20:21, "Jason Gustafson" wrote: The point about creation of maps seems orthogonal. We can still represent the headers as a slice of bytes until the time it is accessed. > Yes exactly we have access to the records thus why the header should be > accessible via it and not hidden for only interceptors to access. As explained above, the point is to make the intended usage clear. Applications should continue to rely on the key/value fields to serialize their own headers, and it would be more ideal if we can avoid leaking third-party headers into applications. This is difficult to do with the current interceptors because they share the record objects with the common API. What I had in mind is something like an extension of the current interceptors which exposed a different object (e.g. `RecordAndHeaders`). The challenge is for MM-like use cases. Let me see if I can come up with a concrete proposal for that problem. -Jason On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce wrote: > I am happy to move the definition of the header into the message body, but > would cause us not to lazy initialise/parse the headers, as obviously, we > would have to traverse these reading the message. > > This was actually one of Jay’s requests: > > “2. I think we should think about creating the Map lazily to avoid > parsing out all the headers into little objects. HashMaps themselves > are > kind of expensive and the consumer is very perf sensitive so and making > gazillions of hashmaps that may or may not get used is probably a bad > idea.” > > > > > > On 17/02/2017, 19:44, "Michael Pearce" wrote: > > Yes exactly we have access to the records thus why the header should > be accessible via it and not hidden for only interceptors to access. > > Sent using OWA for iPhone > > From: Magnus Edenhill > Sent: Friday, February 17, 2017 7:34:49 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > Big +1 on VarInts. > CPUs are fast, memory is slow. > > I agree with Jason that we'll want to continue verifying messages, > including their headers, so while I appreciate the idea of the opaque > header blob it won't be useful in practice. > > /Magnus > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > > > Sorry, my mistake. The consumer interceptor is per batch, though I'm > not > > sure that's an actual limitation since you still have access to the > > individual records. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > ja...@confluent.io> > > wrote: > > > > > Re headers as byte array and future use by broker. This doesn't > take away > > >> fro
Re: [DISCUSS] KIP-82 - Add Record Headers
What’s the issue with exposing a method getHeaders on the producer/consumer record? It doesn’t break anything. We don’t need any special version. Current batch consumer model and consumer interceptors don’t work where headers need to be acted on at per message level at time of processing, very case is APM (the core one), where the header value is used to continue tracing. JMS/HTTP etc all expose these, without issues. I would NOT want to lock this down to only be usable accessible via interceptors, as we’d fail on one of the main goals. Regards Mike On 17/02/2017, 20:21, "Jason Gustafson" wrote: The point about creation of maps seems orthogonal. We can still represent the headers as a slice of bytes until the time it is accessed. > Yes exactly we have access to the records thus why the header should be > accessible via it and not hidden for only interceptors to access. As explained above, the point is to make the intended usage clear. Applications should continue to rely on the key/value fields to serialize their own headers, and it would be more ideal if we can avoid leaking third-party headers into applications. This is difficult to do with the current interceptors because they share the record objects with the common API. What I had in mind is something like an extension of the current interceptors which exposed a different object (e.g. `RecordAndHeaders`). The challenge is for MM-like use cases. Let me see if I can come up with a concrete proposal for that problem. -Jason On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce wrote: > I am happy to move the definition of the header into the message body, but > would cause us not to lazy initialise/parse the headers, as obviously, we > would have to traverse these reading the message. > > This was actually one of Jay’s requests: > > “2. I think we should think about creating the Map lazily to avoid > parsing out all the headers into little objects. HashMaps themselves > are > kind of expensive and the consumer is very perf sensitive so and making > gazillions of hashmaps that may or may not get used is probably a bad > idea.” > > > > > > On 17/02/2017, 19:44, "Michael Pearce" wrote: > > Yes exactly we have access to the records thus why the header should > be accessible via it and not hidden for only interceptors to access. > > Sent using OWA for iPhone > > From: Magnus Edenhill > Sent: Friday, February 17, 2017 7:34:49 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > Big +1 on VarInts. > CPUs are fast, memory is slow. > > I agree with Jason that we'll want to continue verifying messages, > including their headers, so while I appreciate the idea of the opaque > header blob it won't be useful in practice. > > /Magnus > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > > > Sorry, my mistake. The consumer interceptor is per batch, though I'm > not > > sure that's an actual limitation since you still have access to the > > individual records. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > ja...@confluent.io> > > wrote: > > > > > Re headers as byte array and future use by broker. This doesn't > take away > > >> from that at all. Nor makes it difficult at all in my opinion. > > > > > > > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't > write the > > > schema that way if you were planning to use it on the brokers from > the > > > beginning. Note also that one of the benefits of letting the broker > > > understand headers is that it can validate that they are properly > > > formatted. If cost is the only concern, we should confirm its > impact > > > through performance testing. > > > > > > One of the key use cases requires access on consume at per > event/message > > >> level at the point that message is being processed, as such the > batch > > >> interceptors and batch consume api isn't suitable. It needs to be > at the > > >> record level. > > > > >
Re: [DISCUSS] KIP-82 - Add Record Headers
The point about creation of maps seems orthogonal. We can still represent the headers as a slice of bytes until the time it is accessed. > Yes exactly we have access to the records thus why the header should be > accessible via it and not hidden for only interceptors to access. As explained above, the point is to make the intended usage clear. Applications should continue to rely on the key/value fields to serialize their own headers, and it would be more ideal if we can avoid leaking third-party headers into applications. This is difficult to do with the current interceptors because they share the record objects with the common API. What I had in mind is something like an extension of the current interceptors which exposed a different object (e.g. `RecordAndHeaders`). The challenge is for MM-like use cases. Let me see if I can come up with a concrete proposal for that problem. -Jason On Fri, Feb 17, 2017 at 11:55 AM, Michael Pearce wrote: > I am happy to move the definition of the header into the message body, but > would cause us not to lazy initialise/parse the headers, as obviously, we > would have to traverse these reading the message. > > This was actually one of Jay’s requests: > > “2. I think we should think about creating the Map lazily to avoid > parsing out all the headers into little objects. HashMaps themselves > are > kind of expensive and the consumer is very perf sensitive so and making > gazillions of hashmaps that may or may not get used is probably a bad > idea.” > > > > > > On 17/02/2017, 19:44, "Michael Pearce" wrote: > > Yes exactly we have access to the records thus why the header should > be accessible via it and not hidden for only interceptors to access. > > Sent using OWA for iPhone > > From: Magnus Edenhill > Sent: Friday, February 17, 2017 7:34:49 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > Big +1 on VarInts. > CPUs are fast, memory is slow. > > I agree with Jason that we'll want to continue verifying messages, > including their headers, so while I appreciate the idea of the opaque > header blob it won't be useful in practice. > > /Magnus > > 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > > > Sorry, my mistake. The consumer interceptor is per batch, though I'm > not > > sure that's an actual limitation since you still have access to the > > individual records. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson < > ja...@confluent.io> > > wrote: > > > > > Re headers as byte array and future use by broker. This doesn't > take away > > >> from that at all. Nor makes it difficult at all in my opinion. > > > > > > > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't > write the > > > schema that way if you were planning to use it on the brokers from > the > > > beginning. Note also that one of the benefits of letting the broker > > > understand headers is that it can validate that they are properly > > > formatted. If cost is the only concern, we should confirm its > impact > > > through performance testing. > > > > > > One of the key use cases requires access on consume at per > event/message > > >> level at the point that message is being processed, as such the > batch > > >> interceptors and batch consume api isn't suitable. It needs to be > at the > > >> record level. > > > > > > > > > I'm not sure I understand the point about batching. Interceptors > are > > > applied per-message, right? > > > > > > My intent on interceptors is to keep the usage of headers > well-defined so > > > that they don't start leaking unnecessarily into applications. My > guess > > is > > > that it's probably inevitable, but isolating it in the > interceptors would > > > at least give people a second thought before deciding to use it. > The main > > > challenge in my mind is figuring out how an MM use case would > work. It > > > would be more cumbersome to replicate headers through an > interceptor, > > > though arguably MM should be working at a lower level anyway. > > > > > > -Jason > > > > > > On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce < >
Re: [DISCUSS] KIP-82 - Add Record Headers
I am happy to move the definition of the header into the message body, but would cause us not to lazy initialise/parse the headers, as obviously, we would have to traverse these reading the message. This was actually one of Jay’s requests: “2. I think we should think about creating the Map lazily to avoid parsing out all the headers into little objects. HashMaps themselves are kind of expensive and the consumer is very perf sensitive so and making gazillions of hashmaps that may or may not get used is probably a bad idea.” On 17/02/2017, 19:44, "Michael Pearce" wrote: Yes exactly we have access to the records thus why the header should be accessible via it and not hidden for only interceptors to access. Sent using OWA for iPhone From: Magnus Edenhill Sent: Friday, February 17, 2017 7:34:49 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Big +1 on VarInts. CPUs are fast, memory is slow. I agree with Jason that we'll want to continue verifying messages, including their headers, so while I appreciate the idea of the opaque header blob it won't be useful in practice. /Magnus 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > Sorry, my mistake. The consumer interceptor is per batch, though I'm not > sure that's an actual limitation since you still have access to the > individual records. > > -Jason > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson > wrote: > > > Re headers as byte array and future use by broker. This doesn't take away > >> from that at all. Nor makes it difficult at all in my opinion. > > > > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't write the > > schema that way if you were planning to use it on the brokers from the > > beginning. Note also that one of the benefits of letting the broker > > understand headers is that it can validate that they are properly > > formatted. If cost is the only concern, we should confirm its impact > > through performance testing. > > > > One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > > > > > > I'm not sure I understand the point about batching. Interceptors are > > applied per-message, right? > > > > My intent on interceptors is to keep the usage of headers well-defined so > > that they don't start leaking unnecessarily into applications. My guess > is > > that it's probably inevitable, but isolating it in the interceptors would > > at least give people a second thought before deciding to use it. The main > > challenge in my mind is figuring out how an MM use case would work. It > > would be more cumbersome to replicate headers through an interceptor, > > though arguably MM should be working at a lower level anyway. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce > > wrote: > > > >> Re headers available on the record va interceptors only > >> > >> One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > >> > >> This anyhow is similar to jms/http/amqp where headers are available to > >> consuming applications. > >> > >> Re headers as byte array and future use by broker. This doesn't take > away > >> from that at all. Nor makes it difficult at all in my opinion. > >> > >> > >> > >> Sent using OWA for iPhone > >> > >> From: Jason Gustafson > >> Sent: Friday, February 17, 2017 5:55:42 PM > >> To: dev@kafka.apache.org > >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > >> > >> > > >> > Would you be proposing in KIP-98 to convert the other message int’s > (key > >> > length, value length) also to varint to keep it uniform. > >> > Also I assume there will b
Re: [DISCUSS] KIP-82 - Add Record Headers
Yes exactly we have access to the records thus why the header should be accessible via it and not hidden for only interceptors to access. Sent using OWA for iPhone From: Magnus Edenhill Sent: Friday, February 17, 2017 7:34:49 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers Big +1 on VarInts. CPUs are fast, memory is slow. I agree with Jason that we'll want to continue verifying messages, including their headers, so while I appreciate the idea of the opaque header blob it won't be useful in practice. /Magnus 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > Sorry, my mistake. The consumer interceptor is per batch, though I'm not > sure that's an actual limitation since you still have access to the > individual records. > > -Jason > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson > wrote: > > > Re headers as byte array and future use by broker. This doesn't take away > >> from that at all. Nor makes it difficult at all in my opinion. > > > > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't write the > > schema that way if you were planning to use it on the brokers from the > > beginning. Note also that one of the benefits of letting the broker > > understand headers is that it can validate that they are properly > > formatted. If cost is the only concern, we should confirm its impact > > through performance testing. > > > > One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > > > > > > I'm not sure I understand the point about batching. Interceptors are > > applied per-message, right? > > > > My intent on interceptors is to keep the usage of headers well-defined so > > that they don't start leaking unnecessarily into applications. My guess > is > > that it's probably inevitable, but isolating it in the interceptors would > > at least give people a second thought before deciding to use it. The main > > challenge in my mind is figuring out how an MM use case would work. It > > would be more cumbersome to replicate headers through an interceptor, > > though arguably MM should be working at a lower level anyway. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce > > wrote: > > > >> Re headers available on the record va interceptors only > >> > >> One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > >> > >> This anyhow is similar to jms/http/amqp where headers are available to > >> consuming applications. > >> > >> Re headers as byte array and future use by broker. This doesn't take > away > >> from that at all. Nor makes it difficult at all in my opinion. > >> > >> > >> > >> Sent using OWA for iPhone > >> > >> From: Jason Gustafson > >> Sent: Friday, February 17, 2017 5:55:42 PM > >> To: dev@kafka.apache.org > >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > >> > >> > > >> > Would you be proposing in KIP-98 to convert the other message int’s > (key > >> > length, value length) also to varint to keep it uniform. > >> > Also I assume there will be a static or helper method made to > write/read > >> > these in the client and server. > >> > >> > >> Yes, that is what we are proposing, so using varints for headers would > be > >> consistent with the rest of the message. We have used static helper > >> methods > >> in our prototype implementation. > >> > >> The cost of parsing, we want to parse/interpret the headers lazily (this > >> is > >> > a key point brought up earlier in discussions) > >> > >> > >> I'm a bit skeptical of this. Has anyone done the performance testing? I > >> can > >> probably implement it and test it if no one else has. I was also under > the > >> impression that there may be use cases down the road where the broker > >> would > >> need to interpret headers. That wouldn't be off the table in the future
Re: [DISCUSS] KIP-82 - Add Record Headers
Big +1 on VarInts. CPUs are fast, memory is slow. I agree with Jason that we'll want to continue verifying messages, including their headers, so while I appreciate the idea of the opaque header blob it won't be useful in practice. /Magnus 2017-02-17 10:41 GMT-08:00 Jason Gustafson : > Sorry, my mistake. The consumer interceptor is per batch, though I'm not > sure that's an actual limitation since you still have access to the > individual records. > > -Jason > > On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson > wrote: > > > Re headers as byte array and future use by broker. This doesn't take away > >> from that at all. Nor makes it difficult at all in my opinion. > > > > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't write the > > schema that way if you were planning to use it on the brokers from the > > beginning. Note also that one of the benefits of letting the broker > > understand headers is that it can validate that they are properly > > formatted. If cost is the only concern, we should confirm its impact > > through performance testing. > > > > One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > > > > > > I'm not sure I understand the point about batching. Interceptors are > > applied per-message, right? > > > > My intent on interceptors is to keep the usage of headers well-defined so > > that they don't start leaking unnecessarily into applications. My guess > is > > that it's probably inevitable, but isolating it in the interceptors would > > at least give people a second thought before deciding to use it. The main > > challenge in my mind is figuring out how an MM use case would work. It > > would be more cumbersome to replicate headers through an interceptor, > > though arguably MM should be working at a lower level anyway. > > > > -Jason > > > > On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce > > wrote: > > > >> Re headers available on the record va interceptors only > >> > >> One of the key use cases requires access on consume at per event/message > >> level at the point that message is being processed, as such the batch > >> interceptors and batch consume api isn't suitable. It needs to be at the > >> record level. > >> > >> This anyhow is similar to jms/http/amqp where headers are available to > >> consuming applications. > >> > >> Re headers as byte array and future use by broker. This doesn't take > away > >> from that at all. Nor makes it difficult at all in my opinion. > >> > >> > >> > >> Sent using OWA for iPhone > >> > >> From: Jason Gustafson > >> Sent: Friday, February 17, 2017 5:55:42 PM > >> To: dev@kafka.apache.org > >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > >> > >> > > >> > Would you be proposing in KIP-98 to convert the other message int’s > (key > >> > length, value length) also to varint to keep it uniform. > >> > Also I assume there will be a static or helper method made to > write/read > >> > these in the client and server. > >> > >> > >> Yes, that is what we are proposing, so using varints for headers would > be > >> consistent with the rest of the message. We have used static helper > >> methods > >> in our prototype implementation. > >> > >> The cost of parsing, we want to parse/interpret the headers lazily (this > >> is > >> > a key point brought up earlier in discussions) > >> > >> > >> I'm a bit skeptical of this. Has anyone done the performance testing? I > >> can > >> probably implement it and test it if no one else has. I was also under > the > >> impression that there may be use cases down the road where the broker > >> would > >> need to interpret headers. That wouldn't be off the table in the future > if > >> it's represented as bytes, but it would be quite a bit more awkward, > >> right? > >> > >> By the way, one question I have been wondering about. My understanding > is > >> that headers are primarily for use cases where a third-party components > >> wants to enrich messages withou
Re: [DISCUSS] KIP-82 - Add Record Headers
Sorry, my mistake. The consumer interceptor is per batch, though I'm not sure that's an actual limitation since you still have access to the individual records. -Jason On Fri, Feb 17, 2017 at 10:39 AM, Jason Gustafson wrote: > Re headers as byte array and future use by broker. This doesn't take away >> from that at all. Nor makes it difficult at all in my opinion. > > > Yeah, I didn't say it was difficult, only awkward. You wouldn't write the > schema that way if you were planning to use it on the brokers from the > beginning. Note also that one of the benefits of letting the broker > understand headers is that it can validate that they are properly > formatted. If cost is the only concern, we should confirm its impact > through performance testing. > > One of the key use cases requires access on consume at per event/message >> level at the point that message is being processed, as such the batch >> interceptors and batch consume api isn't suitable. It needs to be at the >> record level. > > > I'm not sure I understand the point about batching. Interceptors are > applied per-message, right? > > My intent on interceptors is to keep the usage of headers well-defined so > that they don't start leaking unnecessarily into applications. My guess is > that it's probably inevitable, but isolating it in the interceptors would > at least give people a second thought before deciding to use it. The main > challenge in my mind is figuring out how an MM use case would work. It > would be more cumbersome to replicate headers through an interceptor, > though arguably MM should be working at a lower level anyway. > > -Jason > > On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce > wrote: > >> Re headers available on the record va interceptors only >> >> One of the key use cases requires access on consume at per event/message >> level at the point that message is being processed, as such the batch >> interceptors and batch consume api isn't suitable. It needs to be at the >> record level. >> >> This anyhow is similar to jms/http/amqp where headers are available to >> consuming applications. >> >> Re headers as byte array and future use by broker. This doesn't take away >> from that at all. Nor makes it difficult at all in my opinion. >> >> >> >> Sent using OWA for iPhone >> >> From: Jason Gustafson >> Sent: Friday, February 17, 2017 5:55:42 PM >> To: dev@kafka.apache.org >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers >> >> > >> > Would you be proposing in KIP-98 to convert the other message int’s (key >> > length, value length) also to varint to keep it uniform. >> > Also I assume there will be a static or helper method made to write/read >> > these in the client and server. >> >> >> Yes, that is what we are proposing, so using varints for headers would be >> consistent with the rest of the message. We have used static helper >> methods >> in our prototype implementation. >> >> The cost of parsing, we want to parse/interpret the headers lazily (this >> is >> > a key point brought up earlier in discussions) >> >> >> I'm a bit skeptical of this. Has anyone done the performance testing? I >> can >> probably implement it and test it if no one else has. I was also under the >> impression that there may be use cases down the road where the broker >> would >> need to interpret headers. That wouldn't be off the table in the future if >> it's represented as bytes, but it would be quite a bit more awkward, >> right? >> >> By the way, one question I have been wondering about. My understanding is >> that headers are primarily for use cases where a third-party components >> wants to enrich messages without needing to understand or modify the >> schema >> of the message key and value. For the applications which directly produce >> and consume the messages and control the key/value schema directly, it >> seems we would rather have them implement headers directly in their own >> schema. Supposing for the sake of argument that it was possible, my >> question is whether it be sufficient to expose the headers in the >> interceptor API and not in the common API? >> >> -Jason >> >> On Fri, Feb 17, 2017 at 3:26 AM, Michael Pearce >> wrote: >> >> > On the point of varInts >> > >> > Would you be proposing in KIP-98 to convert the other message int’s (key >> > length, value length) also
Re: [DISCUSS] KIP-82 - Add Record Headers
> > Re headers as byte array and future use by broker. This doesn't take away > from that at all. Nor makes it difficult at all in my opinion. Yeah, I didn't say it was difficult, only awkward. You wouldn't write the schema that way if you were planning to use it on the brokers from the beginning. Note also that one of the benefits of letting the broker understand headers is that it can validate that they are properly formatted. If cost is the only concern, we should confirm its impact through performance testing. One of the key use cases requires access on consume at per event/message > level at the point that message is being processed, as such the batch > interceptors and batch consume api isn't suitable. It needs to be at the > record level. I'm not sure I understand the point about batching. Interceptors are applied per-message, right? My intent on interceptors is to keep the usage of headers well-defined so that they don't start leaking unnecessarily into applications. My guess is that it's probably inevitable, but isolating it in the interceptors would at least give people a second thought before deciding to use it. The main challenge in my mind is figuring out how an MM use case would work. It would be more cumbersome to replicate headers through an interceptor, though arguably MM should be working at a lower level anyway. -Jason On Fri, Feb 17, 2017 at 10:16 AM, Michael Pearce wrote: > Re headers available on the record va interceptors only > > One of the key use cases requires access on consume at per event/message > level at the point that message is being processed, as such the batch > interceptors and batch consume api isn't suitable. It needs to be at the > record level. > > This anyhow is similar to jms/http/amqp where headers are available to > consuming applications. > > Re headers as byte array and future use by broker. This doesn't take away > from that at all. Nor makes it difficult at all in my opinion. > > > > Sent using OWA for iPhone > > From: Jason Gustafson > Sent: Friday, February 17, 2017 5:55:42 PM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > > > > Would you be proposing in KIP-98 to convert the other message int’s (key > > length, value length) also to varint to keep it uniform. > > Also I assume there will be a static or helper method made to write/read > > these in the client and server. > > > Yes, that is what we are proposing, so using varints for headers would be > consistent with the rest of the message. We have used static helper methods > in our prototype implementation. > > The cost of parsing, we want to parse/interpret the headers lazily (this is > > a key point brought up earlier in discussions) > > > I'm a bit skeptical of this. Has anyone done the performance testing? I can > probably implement it and test it if no one else has. I was also under the > impression that there may be use cases down the road where the broker would > need to interpret headers. That wouldn't be off the table in the future if > it's represented as bytes, but it would be quite a bit more awkward, right? > > By the way, one question I have been wondering about. My understanding is > that headers are primarily for use cases where a third-party components > wants to enrich messages without needing to understand or modify the schema > of the message key and value. For the applications which directly produce > and consume the messages and control the key/value schema directly, it > seems we would rather have them implement headers directly in their own > schema. Supposing for the sake of argument that it was possible, my > question is whether it be sufficient to expose the headers in the > interceptor API and not in the common API? > > -Jason > > On Fri, Feb 17, 2017 at 3:26 AM, Michael Pearce > wrote: > > > On the point of varInts > > > > Would you be proposing in KIP-98 to convert the other message int’s (key > > length, value length) also to varint to keep it uniform. > > Also I assume there will be a static or helper method made to write/read > > these in the client and server. > > > > Cheers > > Mike > > > > > > > > On 17/02/2017, 11:22, "Michael Pearce" wrote: > > > > On the point re: headers in the message protocol being a byte array > > and not a count of elements followed by the elements. Again this was > > discussed/argued previously. > > > > It was agreed on for a few reasons some of which you have obviously > > picked up on: > > > > Broker is able to pass it through opaq
Re: [DISCUSS] KIP-82 - Add Record Headers
Re headers available on the record va interceptors only One of the key use cases requires access on consume at per event/message level at the point that message is being processed, as such the batch interceptors and batch consume api isn't suitable. It needs to be at the record level. This anyhow is similar to jms/http/amqp where headers are available to consuming applications. Re headers as byte array and future use by broker. This doesn't take away from that at all. Nor makes it difficult at all in my opinion. Sent using OWA for iPhone From: Jason Gustafson Sent: Friday, February 17, 2017 5:55:42 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > > Would you be proposing in KIP-98 to convert the other message int’s (key > length, value length) also to varint to keep it uniform. > Also I assume there will be a static or helper method made to write/read > these in the client and server. Yes, that is what we are proposing, so using varints for headers would be consistent with the rest of the message. We have used static helper methods in our prototype implementation. The cost of parsing, we want to parse/interpret the headers lazily (this is > a key point brought up earlier in discussions) I'm a bit skeptical of this. Has anyone done the performance testing? I can probably implement it and test it if no one else has. I was also under the impression that there may be use cases down the road where the broker would need to interpret headers. That wouldn't be off the table in the future if it's represented as bytes, but it would be quite a bit more awkward, right? By the way, one question I have been wondering about. My understanding is that headers are primarily for use cases where a third-party components wants to enrich messages without needing to understand or modify the schema of the message key and value. For the applications which directly produce and consume the messages and control the key/value schema directly, it seems we would rather have them implement headers directly in their own schema. Supposing for the sake of argument that it was possible, my question is whether it be sufficient to expose the headers in the interceptor API and not in the common API? -Jason On Fri, Feb 17, 2017 at 3:26 AM, Michael Pearce wrote: > On the point of varInts > > Would you be proposing in KIP-98 to convert the other message int’s (key > length, value length) also to varint to keep it uniform. > Also I assume there will be a static or helper method made to write/read > these in the client and server. > > Cheers > Mike > > > > On 17/02/2017, 11:22, "Michael Pearce" wrote: > > On the point re: headers in the message protocol being a byte array > and not a count of elements followed by the elements. Again this was > discussed/argued previously. > > It was agreed on for a few reasons some of which you have obviously > picked up on: > > Broker is able to pass it through opaquely > The cost of parsing, we want to parse/interpret the headers lazily > (this is a key point brought up earlier in discussions) > Headers can be copied from consumer record to producer record (aka > mirror makers etc) without parsing if no changes are being made or being > looked at. > Keeps the broker agnostic to the format > You need an int32 either for the byte size of the headers, or for the > count of elements, so overheads are the same, but with going with an opaque > byte array has the above advantages. > > Cheers > Mike > > > On 17/02/2017, 02:50, "Jason Gustafson" wrote: > > Sorry, should have noted that the performance testing was done > using the > producer performance tool shipped with Kafka. > > -Jason > > On Thu, Feb 16, 2017 at 6:44 PM, Jason Gustafson < > ja...@confluent.io> wrote: > > > Hey Nacho, > > > > I've compared performance of our KIP-98 implementation with and > without > > varints. For messages around 128 bytes, we see an increase in > throughput of > > about 30% using the default configuration settings. At 256 > bytes, the > > increase is around 16%. Obviously the performance converge as > messages get > > larger, but it seems well worth the cost. Note that we are also > seeing a > > substantial performance increase against trunk primarily because > of the > > much more efficient packing that varints provide us. Anything > adding to > > message overhead, such as record headers, would only increase > the relative > > difference. (Of course take these numbers with a gr
Re: [DISCUSS] KIP-82 - Add Record Headers
> > Would you be proposing in KIP-98 to convert the other message int’s (key > length, value length) also to varint to keep it uniform. > Also I assume there will be a static or helper method made to write/read > these in the client and server. Yes, that is what we are proposing, so using varints for headers would be consistent with the rest of the message. We have used static helper methods in our prototype implementation. The cost of parsing, we want to parse/interpret the headers lazily (this is > a key point brought up earlier in discussions) I'm a bit skeptical of this. Has anyone done the performance testing? I can probably implement it and test it if no one else has. I was also under the impression that there may be use cases down the road where the broker would need to interpret headers. That wouldn't be off the table in the future if it's represented as bytes, but it would be quite a bit more awkward, right? By the way, one question I have been wondering about. My understanding is that headers are primarily for use cases where a third-party components wants to enrich messages without needing to understand or modify the schema of the message key and value. For the applications which directly produce and consume the messages and control the key/value schema directly, it seems we would rather have them implement headers directly in their own schema. Supposing for the sake of argument that it was possible, my question is whether it be sufficient to expose the headers in the interceptor API and not in the common API? -Jason On Fri, Feb 17, 2017 at 3:26 AM, Michael Pearce wrote: > On the point of varInts > > Would you be proposing in KIP-98 to convert the other message int’s (key > length, value length) also to varint to keep it uniform. > Also I assume there will be a static or helper method made to write/read > these in the client and server. > > Cheers > Mike > > > > On 17/02/2017, 11:22, "Michael Pearce" wrote: > > On the point re: headers in the message protocol being a byte array > and not a count of elements followed by the elements. Again this was > discussed/argued previously. > > It was agreed on for a few reasons some of which you have obviously > picked up on: > > Broker is able to pass it through opaquely > The cost of parsing, we want to parse/interpret the headers lazily > (this is a key point brought up earlier in discussions) > Headers can be copied from consumer record to producer record (aka > mirror makers etc) without parsing if no changes are being made or being > looked at. > Keeps the broker agnostic to the format > You need an int32 either for the byte size of the headers, or for the > count of elements, so overheads are the same, but with going with an opaque > byte array has the above advantages. > > Cheers > Mike > > > On 17/02/2017, 02:50, "Jason Gustafson" wrote: > > Sorry, should have noted that the performance testing was done > using the > producer performance tool shipped with Kafka. > > -Jason > > On Thu, Feb 16, 2017 at 6:44 PM, Jason Gustafson < > ja...@confluent.io> wrote: > > > Hey Nacho, > > > > I've compared performance of our KIP-98 implementation with and > without > > varints. For messages around 128 bytes, we see an increase in > throughput of > > about 30% using the default configuration settings. At 256 > bytes, the > > increase is around 16%. Obviously the performance converge as > messages get > > larger, but it seems well worth the cost. Note that we are also > seeing a > > substantial performance increase against trunk primarily because > of the > > much more efficient packing that varints provide us. Anything > adding to > > message overhead, such as record headers, would only increase > the relative > > difference. (Of course take these numbers with a grain of salt > since I have > > only used the default settings with both the producer and broker > on my > > local machine. We intend to provide more extensive performance > details as > > part of the work for KIP-98.) > > > > The implementation we are using is from protobuf ( > > https://developers.google.com/protocol-buffers/docs/encoding), > which is > > also used in HBase. It is trivial to implement and as far as I > know doesn't > > suffer from the aliasing problem you are describing. I checked > with Magnus > > (the author of librdkafka) and he agreed that the savings seemed > worth the > > cost of implementation. > > > > -Jason > > > > On Thu, Feb 16, 2017 at 4:32 PM, Ignacio Solis > wrote: > > > >> -VarInts > >> > >> I'm one of the people (if not the most) opposed to VarInts. > VarInts > >> have a place, but this is not it. (We had a large discussion > about > >> the
Re: [DISCUSS] KIP-82 - Add Record Headers
On the point of varInts Would you be proposing in KIP-98 to convert the other message int’s (key length, value length) also to varint to keep it uniform. Also I assume there will be a static or helper method made to write/read these in the client and server. Cheers Mike On 17/02/2017, 11:22, "Michael Pearce" wrote: On the point re: headers in the message protocol being a byte array and not a count of elements followed by the elements. Again this was discussed/argued previously. It was agreed on for a few reasons some of which you have obviously picked up on: Broker is able to pass it through opaquely The cost of parsing, we want to parse/interpret the headers lazily (this is a key point brought up earlier in discussions) Headers can be copied from consumer record to producer record (aka mirror makers etc) without parsing if no changes are being made or being looked at. Keeps the broker agnostic to the format You need an int32 either for the byte size of the headers, or for the count of elements, so overheads are the same, but with going with an opaque byte array has the above advantages. Cheers Mike On 17/02/2017, 02:50, "Jason Gustafson" wrote: Sorry, should have noted that the performance testing was done using the producer performance tool shipped with Kafka. -Jason On Thu, Feb 16, 2017 at 6:44 PM, Jason Gustafson wrote: > Hey Nacho, > > I've compared performance of our KIP-98 implementation with and without > varints. For messages around 128 bytes, we see an increase in throughput of > about 30% using the default configuration settings. At 256 bytes, the > increase is around 16%. Obviously the performance converge as messages get > larger, but it seems well worth the cost. Note that we are also seeing a > substantial performance increase against trunk primarily because of the > much more efficient packing that varints provide us. Anything adding to > message overhead, such as record headers, would only increase the relative > difference. (Of course take these numbers with a grain of salt since I have > only used the default settings with both the producer and broker on my > local machine. We intend to provide more extensive performance details as > part of the work for KIP-98.) > > The implementation we are using is from protobuf ( > https://developers.google.com/protocol-buffers/docs/encoding), which is > also used in HBase. It is trivial to implement and as far as I know doesn't > suffer from the aliasing problem you are describing. I checked with Magnus > (the author of librdkafka) and he agreed that the savings seemed worth the > cost of implementation. > > -Jason > > On Thu, Feb 16, 2017 at 4:32 PM, Ignacio Solis wrote: > >> -VarInts >> >> I'm one of the people (if not the most) opposed to VarInts. VarInts >> have a place, but this is not it. (We had a large discussion about >> them at the beginning of KIP-82 time) >> >> If anybody has real life performance numbers of VarInts improving >> things or significantly reducing resources I would like to know what >> that case may be. Yes, you can save some bytes here and there, but >> this is probably insignificant to the overall system behavior and >> storage requirements. -- I say this with respect to using VarInts in >> the protocol itself, not as part of the data. >> >> VarInts require you to parse the Int before using it and depending on >> the encoding they can suffer from aliasing (multiple representations >> for the same value). >> >> Why add complexity? >> >> Nacho >> >> >> On Thu, Feb 16, 2017 at 10:29 AM, Colin McCabe >> wrote: >> > +1 for varints here-- it would save quite a bit of space. They are >> > pretty quick to implement as well. >> > >> > I think it makes sense for values to be byte arrays. Users might want >> > to attach arbitrary payloads; they shouldn't be forced to serialize >> > everything to Java strings. >> > >> > best, >> > Colin >> > >> > >> > On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: >> >> Hey Michael, >> >> >> >> Hmm, I guess the point of representing it as bytes is to allow the >> broker >> >> to pass it through opaquely? Is the cost of parsing them a concern, or >> >> are >> >> we simply trying to ensure that the broker stays agnostic to the >> format? >> >> >> >> On varints, I think adding support for them makes less sens
Re: [DISCUSS] KIP-82 - Add Record Headers
On the point re: headers in the message protocol being a byte array and not a count of elements followed by the elements. Again this was discussed/argued previously. It was agreed on for a few reasons some of which you have obviously picked up on: Broker is able to pass it through opaquely The cost of parsing, we want to parse/interpret the headers lazily (this is a key point brought up earlier in discussions) Headers can be copied from consumer record to producer record (aka mirror makers etc) without parsing if no changes are being made or being looked at. Keeps the broker agnostic to the format You need an int32 either for the byte size of the headers, or for the count of elements, so overheads are the same, but with going with an opaque byte array has the above advantages. Cheers Mike On 17/02/2017, 02:50, "Jason Gustafson" wrote: Sorry, should have noted that the performance testing was done using the producer performance tool shipped with Kafka. -Jason On Thu, Feb 16, 2017 at 6:44 PM, Jason Gustafson wrote: > Hey Nacho, > > I've compared performance of our KIP-98 implementation with and without > varints. For messages around 128 bytes, we see an increase in throughput of > about 30% using the default configuration settings. At 256 bytes, the > increase is around 16%. Obviously the performance converge as messages get > larger, but it seems well worth the cost. Note that we are also seeing a > substantial performance increase against trunk primarily because of the > much more efficient packing that varints provide us. Anything adding to > message overhead, such as record headers, would only increase the relative > difference. (Of course take these numbers with a grain of salt since I have > only used the default settings with both the producer and broker on my > local machine. We intend to provide more extensive performance details as > part of the work for KIP-98.) > > The implementation we are using is from protobuf ( > https://developers.google.com/protocol-buffers/docs/encoding), which is > also used in HBase. It is trivial to implement and as far as I know doesn't > suffer from the aliasing problem you are describing. I checked with Magnus > (the author of librdkafka) and he agreed that the savings seemed worth the > cost of implementation. > > -Jason > > On Thu, Feb 16, 2017 at 4:32 PM, Ignacio Solis wrote: > >> -VarInts >> >> I'm one of the people (if not the most) opposed to VarInts. VarInts >> have a place, but this is not it. (We had a large discussion about >> them at the beginning of KIP-82 time) >> >> If anybody has real life performance numbers of VarInts improving >> things or significantly reducing resources I would like to know what >> that case may be. Yes, you can save some bytes here and there, but >> this is probably insignificant to the overall system behavior and >> storage requirements. -- I say this with respect to using VarInts in >> the protocol itself, not as part of the data. >> >> VarInts require you to parse the Int before using it and depending on >> the encoding they can suffer from aliasing (multiple representations >> for the same value). >> >> Why add complexity? >> >> Nacho >> >> >> On Thu, Feb 16, 2017 at 10:29 AM, Colin McCabe >> wrote: >> > +1 for varints here-- it would save quite a bit of space. They are >> > pretty quick to implement as well. >> > >> > I think it makes sense for values to be byte arrays. Users might want >> > to attach arbitrary payloads; they shouldn't be forced to serialize >> > everything to Java strings. >> > >> > best, >> > Colin >> > >> > >> > On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: >> >> Hey Michael, >> >> >> >> Hmm, I guess the point of representing it as bytes is to allow the >> broker >> >> to pass it through opaquely? Is the cost of parsing them a concern, or >> >> are >> >> we simply trying to ensure that the broker stays agnostic to the >> format? >> >> >> >> On varints, I think adding support for them makes less sense for an >> >> isolated use case, but as part of a more holistic change (such as what >> we >> >> have proposed in KIP-98), I think they are justifiable. If we add them, >> >> then the need to use attributes becomes quite a bit weaker, right? The >> >> other thing I find slightly odd is the fact that null headers has no >> >> actual >> >> semantic meaning for the message (unlike null keys and values). It is >> >> just >> >> a space optimization. It seems a bit better to always use size 0 to >> >> indicate having no headers. >> >> >> >> Overall, the main point is ensuring that the message schema remains >> >> consistent, eit
Re: [DISCUSS] KIP-82 - Add Record Headers
Sorry, should have noted that the performance testing was done using the producer performance tool shipped with Kafka. -Jason On Thu, Feb 16, 2017 at 6:44 PM, Jason Gustafson wrote: > Hey Nacho, > > I've compared performance of our KIP-98 implementation with and without > varints. For messages around 128 bytes, we see an increase in throughput of > about 30% using the default configuration settings. At 256 bytes, the > increase is around 16%. Obviously the performance converge as messages get > larger, but it seems well worth the cost. Note that we are also seeing a > substantial performance increase against trunk primarily because of the > much more efficient packing that varints provide us. Anything adding to > message overhead, such as record headers, would only increase the relative > difference. (Of course take these numbers with a grain of salt since I have > only used the default settings with both the producer and broker on my > local machine. We intend to provide more extensive performance details as > part of the work for KIP-98.) > > The implementation we are using is from protobuf ( > https://developers.google.com/protocol-buffers/docs/encoding), which is > also used in HBase. It is trivial to implement and as far as I know doesn't > suffer from the aliasing problem you are describing. I checked with Magnus > (the author of librdkafka) and he agreed that the savings seemed worth the > cost of implementation. > > -Jason > > On Thu, Feb 16, 2017 at 4:32 PM, Ignacio Solis wrote: > >> -VarInts >> >> I'm one of the people (if not the most) opposed to VarInts. VarInts >> have a place, but this is not it. (We had a large discussion about >> them at the beginning of KIP-82 time) >> >> If anybody has real life performance numbers of VarInts improving >> things or significantly reducing resources I would like to know what >> that case may be. Yes, you can save some bytes here and there, but >> this is probably insignificant to the overall system behavior and >> storage requirements. -- I say this with respect to using VarInts in >> the protocol itself, not as part of the data. >> >> VarInts require you to parse the Int before using it and depending on >> the encoding they can suffer from aliasing (multiple representations >> for the same value). >> >> Why add complexity? >> >> Nacho >> >> >> On Thu, Feb 16, 2017 at 10:29 AM, Colin McCabe >> wrote: >> > +1 for varints here-- it would save quite a bit of space. They are >> > pretty quick to implement as well. >> > >> > I think it makes sense for values to be byte arrays. Users might want >> > to attach arbitrary payloads; they shouldn't be forced to serialize >> > everything to Java strings. >> > >> > best, >> > Colin >> > >> > >> > On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: >> >> Hey Michael, >> >> >> >> Hmm, I guess the point of representing it as bytes is to allow the >> broker >> >> to pass it through opaquely? Is the cost of parsing them a concern, or >> >> are >> >> we simply trying to ensure that the broker stays agnostic to the >> format? >> >> >> >> On varints, I think adding support for them makes less sense for an >> >> isolated use case, but as part of a more holistic change (such as what >> we >> >> have proposed in KIP-98), I think they are justifiable. If we add them, >> >> then the need to use attributes becomes quite a bit weaker, right? The >> >> other thing I find slightly odd is the fact that null headers has no >> >> actual >> >> semantic meaning for the message (unlike null keys and values). It is >> >> just >> >> a space optimization. It seems a bit better to always use size 0 to >> >> indicate having no headers. >> >> >> >> Overall, the main point is ensuring that the message schema remains >> >> consistent, either within the larger protocol, or at a minimum within >> the >> >> message itself. >> >> >> >> -Jason >> >> >> >> On Thu, Feb 16, 2017 at 6:39 AM, Michael Pearce > > >> >> wrote: >> >> >> >> > Hi Jason, >> >> > >> >> > On point 1) in the message protocol the headers are simply a byte >> array, >> >> > as like the key or value, this is to clearly demarcate the header in >> the >> >> > core message. Then the header byte array in the core message is an >> array of >> >> > key, value pairs. This is what it is denoting. >> >> > >> >> > Then this would be I guess in the given notation: >> >> > >> >> > Headers => [KeyLength, Key, ValueLength, Value] >> >> > KeyLength => int32 <-NEW size of the byte[] of >> the >> >> > serialised key value >> >> > Key => bytes <-- NEW serialised string (UTF8) >> >> > bytes of the header key >> >> > ValueLength => int32 <-- NEW size of the byte[] of >> the >> >> > serialised header value >> >> > Value => bytes < NEW serialised form of the >> header >> >> > value >> >> > >> >> > The key length and value length is matching the way the protocol is >> >> > defined in the core message currently. >>
Re: [DISCUSS] KIP-82 - Add Record Headers
Hey Nacho, I've compared performance of our KIP-98 implementation with and without varints. For messages around 128 bytes, we see an increase in throughput of about 30% using the default configuration settings. At 256 bytes, the increase is around 16%. Obviously the performance converge as messages get larger, but it seems well worth the cost. Note that we are also seeing a substantial performance increase against trunk primarily because of the much more efficient packing that varints provide us. Anything adding to message overhead, such as record headers, would only increase the relative difference. (Of course take these numbers with a grain of salt since I have only used the default settings with both the producer and broker on my local machine. We intend to provide more extensive performance details as part of the work for KIP-98.) The implementation we are using is from protobuf ( https://developers.google.com/protocol-buffers/docs/encoding), which is also used in HBase. It is trivial to implement and as far as I know doesn't suffer from the aliasing problem you are describing. I checked with Magnus (the author of librdkafka) and he agreed that the savings seemed worth the cost of implementation. -Jason On Thu, Feb 16, 2017 at 4:32 PM, Ignacio Solis wrote: > -VarInts > > I'm one of the people (if not the most) opposed to VarInts. VarInts > have a place, but this is not it. (We had a large discussion about > them at the beginning of KIP-82 time) > > If anybody has real life performance numbers of VarInts improving > things or significantly reducing resources I would like to know what > that case may be. Yes, you can save some bytes here and there, but > this is probably insignificant to the overall system behavior and > storage requirements. -- I say this with respect to using VarInts in > the protocol itself, not as part of the data. > > VarInts require you to parse the Int before using it and depending on > the encoding they can suffer from aliasing (multiple representations > for the same value). > > Why add complexity? > > Nacho > > > On Thu, Feb 16, 2017 at 10:29 AM, Colin McCabe wrote: > > +1 for varints here-- it would save quite a bit of space. They are > > pretty quick to implement as well. > > > > I think it makes sense for values to be byte arrays. Users might want > > to attach arbitrary payloads; they shouldn't be forced to serialize > > everything to Java strings. > > > > best, > > Colin > > > > > > On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: > >> Hey Michael, > >> > >> Hmm, I guess the point of representing it as bytes is to allow the > broker > >> to pass it through opaquely? Is the cost of parsing them a concern, or > >> are > >> we simply trying to ensure that the broker stays agnostic to the format? > >> > >> On varints, I think adding support for them makes less sense for an > >> isolated use case, but as part of a more holistic change (such as what > we > >> have proposed in KIP-98), I think they are justifiable. If we add them, > >> then the need to use attributes becomes quite a bit weaker, right? The > >> other thing I find slightly odd is the fact that null headers has no > >> actual > >> semantic meaning for the message (unlike null keys and values). It is > >> just > >> a space optimization. It seems a bit better to always use size 0 to > >> indicate having no headers. > >> > >> Overall, the main point is ensuring that the message schema remains > >> consistent, either within the larger protocol, or at a minimum within > the > >> message itself. > >> > >> -Jason > >> > >> On Thu, Feb 16, 2017 at 6:39 AM, Michael Pearce > >> wrote: > >> > >> > Hi Jason, > >> > > >> > On point 1) in the message protocol the headers are simply a byte > array, > >> > as like the key or value, this is to clearly demarcate the header in > the > >> > core message. Then the header byte array in the core message is an > array of > >> > key, value pairs. This is what it is denoting. > >> > > >> > Then this would be I guess in the given notation: > >> > > >> > Headers => [KeyLength, Key, ValueLength, Value] > >> > KeyLength => int32 <-NEW size of the byte[] of the > >> > serialised key value > >> > Key => bytes <-- NEW serialised string (UTF8) > >> > bytes of the header key > >> > ValueLength => int32 <-- NEW size of the byte[] of the > >> > serialised header value > >> > Value => bytes < NEW serialised form of the > header > >> > value > >> > > >> > The key length and value length is matching the way the protocol is > >> > defined in the core message currently. > >> > > >> > > >> > > >> > > >> > On point 2) > >> > Var sized ints, this was discussed much earlier on, in fact I had > >> > suggested it myself (with Hadoop references), the complexity of this > >> > compared to having a simpler protocol was argued and agreed it wasn’t > worth > >> > the complexity as all other clients in other language
Re: [DISCUSS] KIP-82 - Add Record Headers
-VarInts I'm one of the people (if not the most) opposed to VarInts. VarInts have a place, but this is not it. (We had a large discussion about them at the beginning of KIP-82 time) If anybody has real life performance numbers of VarInts improving things or significantly reducing resources I would like to know what that case may be. Yes, you can save some bytes here and there, but this is probably insignificant to the overall system behavior and storage requirements. -- I say this with respect to using VarInts in the protocol itself, not as part of the data. VarInts require you to parse the Int before using it and depending on the encoding they can suffer from aliasing (multiple representations for the same value). Why add complexity? Nacho On Thu, Feb 16, 2017 at 10:29 AM, Colin McCabe wrote: > +1 for varints here-- it would save quite a bit of space. They are > pretty quick to implement as well. > > I think it makes sense for values to be byte arrays. Users might want > to attach arbitrary payloads; they shouldn't be forced to serialize > everything to Java strings. > > best, > Colin > > > On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: >> Hey Michael, >> >> Hmm, I guess the point of representing it as bytes is to allow the broker >> to pass it through opaquely? Is the cost of parsing them a concern, or >> are >> we simply trying to ensure that the broker stays agnostic to the format? >> >> On varints, I think adding support for them makes less sense for an >> isolated use case, but as part of a more holistic change (such as what we >> have proposed in KIP-98), I think they are justifiable. If we add them, >> then the need to use attributes becomes quite a bit weaker, right? The >> other thing I find slightly odd is the fact that null headers has no >> actual >> semantic meaning for the message (unlike null keys and values). It is >> just >> a space optimization. It seems a bit better to always use size 0 to >> indicate having no headers. >> >> Overall, the main point is ensuring that the message schema remains >> consistent, either within the larger protocol, or at a minimum within the >> message itself. >> >> -Jason >> >> On Thu, Feb 16, 2017 at 6:39 AM, Michael Pearce >> wrote: >> >> > Hi Jason, >> > >> > On point 1) in the message protocol the headers are simply a byte array, >> > as like the key or value, this is to clearly demarcate the header in the >> > core message. Then the header byte array in the core message is an array of >> > key, value pairs. This is what it is denoting. >> > >> > Then this would be I guess in the given notation: >> > >> > Headers => [KeyLength, Key, ValueLength, Value] >> > KeyLength => int32 <-NEW size of the byte[] of the >> > serialised key value >> > Key => bytes <-- NEW serialised string (UTF8) >> > bytes of the header key >> > ValueLength => int32 <-- NEW size of the byte[] of the >> > serialised header value >> > Value => bytes < NEW serialised form of the header >> > value >> > >> > The key length and value length is matching the way the protocol is >> > defined in the core message currently. >> > >> > >> > >> > >> > On point 2) >> > Var sized ints, this was discussed much earlier on, in fact I had >> > suggested it myself (with Hadoop references), the complexity of this >> > compared to having a simpler protocol was argued and agreed it wasn’t worth >> > the complexity as all other clients in other languages would need to ensure >> > theyre using the right var size algorithm, as there is a few. >> > >> > On point 3) >> > We did the attributes, optional approach as originally there was marked >> > concern that headers would cause a message size overhead for others, who >> > don’t want them. As such this is the clean solution to achieve that. If >> > that no longer holds, and we don’t care that we add 4bytes overhead, then >> > im happy to remove. >> > >> > I’m personally in favour of keeping the message as small as possible so >> > people don’t get shocks in perf and throughputs dues to message size, >> > unless they actively use the feature, as such I do prefer the attribute bit >> > wise feature flag approach myself. >> > >> > >> > >> > >> > On 16/02/2017, 05:40, "Jason Gustafson" wrote: >> > >> > We have proposed a few significant changes to the message format in >> > KIP-98 >> > which now seems likely to pass (perhaps with some iterations on >> > implementation details). It would be good to try and coordinate the >> > changes >> > in both of the proposals to make sure they are consistent and >> > compatible. >> > >> > I think using the attributes to indicate null headers is a reasonable >> > approach. We have proposed to do the same thing for the message key and >> > value. That said, I sympathize with Jay's argument. Having multiple >> > ways to >> > specify a null value increases the overall complexity of the protocol. >
Re: [DISCUSS] KIP-82 - Add Record Headers
+1 for varints here-- it would save quite a bit of space. They are pretty quick to implement as well. I think it makes sense for values to be byte arrays. Users might want to attach arbitrary payloads; they shouldn't be forced to serialize everything to Java strings. best, Colin On Thu, Feb 16, 2017, at 09:52, Jason Gustafson wrote: > Hey Michael, > > Hmm, I guess the point of representing it as bytes is to allow the broker > to pass it through opaquely? Is the cost of parsing them a concern, or > are > we simply trying to ensure that the broker stays agnostic to the format? > > On varints, I think adding support for them makes less sense for an > isolated use case, but as part of a more holistic change (such as what we > have proposed in KIP-98), I think they are justifiable. If we add them, > then the need to use attributes becomes quite a bit weaker, right? The > other thing I find slightly odd is the fact that null headers has no > actual > semantic meaning for the message (unlike null keys and values). It is > just > a space optimization. It seems a bit better to always use size 0 to > indicate having no headers. > > Overall, the main point is ensuring that the message schema remains > consistent, either within the larger protocol, or at a minimum within the > message itself. > > -Jason > > On Thu, Feb 16, 2017 at 6:39 AM, Michael Pearce > wrote: > > > Hi Jason, > > > > On point 1) in the message protocol the headers are simply a byte array, > > as like the key or value, this is to clearly demarcate the header in the > > core message. Then the header byte array in the core message is an array of > > key, value pairs. This is what it is denoting. > > > > Then this would be I guess in the given notation: > > > > Headers => [KeyLength, Key, ValueLength, Value] > > KeyLength => int32 <-NEW size of the byte[] of the > > serialised key value > > Key => bytes <-- NEW serialised string (UTF8) > > bytes of the header key > > ValueLength => int32 <-- NEW size of the byte[] of the > > serialised header value > > Value => bytes < NEW serialised form of the header > > value > > > > The key length and value length is matching the way the protocol is > > defined in the core message currently. > > > > > > > > > > On point 2) > > Var sized ints, this was discussed much earlier on, in fact I had > > suggested it myself (with Hadoop references), the complexity of this > > compared to having a simpler protocol was argued and agreed it wasn’t worth > > the complexity as all other clients in other languages would need to ensure > > theyre using the right var size algorithm, as there is a few. > > > > On point 3) > > We did the attributes, optional approach as originally there was marked > > concern that headers would cause a message size overhead for others, who > > don’t want them. As such this is the clean solution to achieve that. If > > that no longer holds, and we don’t care that we add 4bytes overhead, then > > im happy to remove. > > > > I’m personally in favour of keeping the message as small as possible so > > people don’t get shocks in perf and throughputs dues to message size, > > unless they actively use the feature, as such I do prefer the attribute bit > > wise feature flag approach myself. > > > > > > > > > > On 16/02/2017, 05:40, "Jason Gustafson" wrote: > > > > We have proposed a few significant changes to the message format in > > KIP-98 > > which now seems likely to pass (perhaps with some iterations on > > implementation details). It would be good to try and coordinate the > > changes > > in both of the proposals to make sure they are consistent and > > compatible. > > > > I think using the attributes to indicate null headers is a reasonable > > approach. We have proposed to do the same thing for the message key and > > value. That said, I sympathize with Jay's argument. Having multiple > > ways to > > specify a null value increases the overall complexity of the protocol. > > You > > can see this just from the fact that you need the extra verbiage in the > > protocol specification in this KIP and in KIP-98 to describe the > > dependence > > between the fields and the attributes. It seems like a slippery slope > > if > > you start allowing different request types to implement the protocol > > specification differently. > > > > You can also argue that the messages already are and are likely to > > remain a > > special case. For example, there is currently no generality in how > > compressed message sets are represented that would be applicable for > > other > > request types. Some might see this divergence as an unfortunate > > protocol > > deficiency which should be fixed; others might see it as sort of the > > inevitability of needing to optimize where it counts most. I'm probably > > somewhere in between, but I think we probabl
Re: [DISCUSS] KIP-82 - Add Record Headers
Hey Michael, Hmm, I guess the point of representing it as bytes is to allow the broker to pass it through opaquely? Is the cost of parsing them a concern, or are we simply trying to ensure that the broker stays agnostic to the format? On varints, I think adding support for them makes less sense for an isolated use case, but as part of a more holistic change (such as what we have proposed in KIP-98), I think they are justifiable. If we add them, then the need to use attributes becomes quite a bit weaker, right? The other thing I find slightly odd is the fact that null headers has no actual semantic meaning for the message (unlike null keys and values). It is just a space optimization. It seems a bit better to always use size 0 to indicate having no headers. Overall, the main point is ensuring that the message schema remains consistent, either within the larger protocol, or at a minimum within the message itself. -Jason On Thu, Feb 16, 2017 at 6:39 AM, Michael Pearce wrote: > Hi Jason, > > On point 1) in the message protocol the headers are simply a byte array, > as like the key or value, this is to clearly demarcate the header in the > core message. Then the header byte array in the core message is an array of > key, value pairs. This is what it is denoting. > > Then this would be I guess in the given notation: > > Headers => [KeyLength, Key, ValueLength, Value] > KeyLength => int32 <-NEW size of the byte[] of the > serialised key value > Key => bytes <-- NEW serialised string (UTF8) > bytes of the header key > ValueLength => int32 <-- NEW size of the byte[] of the > serialised header value > Value => bytes < NEW serialised form of the header > value > > The key length and value length is matching the way the protocol is > defined in the core message currently. > > > > > On point 2) > Var sized ints, this was discussed much earlier on, in fact I had > suggested it myself (with Hadoop references), the complexity of this > compared to having a simpler protocol was argued and agreed it wasn’t worth > the complexity as all other clients in other languages would need to ensure > theyre using the right var size algorithm, as there is a few. > > On point 3) > We did the attributes, optional approach as originally there was marked > concern that headers would cause a message size overhead for others, who > don’t want them. As such this is the clean solution to achieve that. If > that no longer holds, and we don’t care that we add 4bytes overhead, then > im happy to remove. > > I’m personally in favour of keeping the message as small as possible so > people don’t get shocks in perf and throughputs dues to message size, > unless they actively use the feature, as such I do prefer the attribute bit > wise feature flag approach myself. > > > > > On 16/02/2017, 05:40, "Jason Gustafson" wrote: > > We have proposed a few significant changes to the message format in > KIP-98 > which now seems likely to pass (perhaps with some iterations on > implementation details). It would be good to try and coordinate the > changes > in both of the proposals to make sure they are consistent and > compatible. > > I think using the attributes to indicate null headers is a reasonable > approach. We have proposed to do the same thing for the message key and > value. That said, I sympathize with Jay's argument. Having multiple > ways to > specify a null value increases the overall complexity of the protocol. > You > can see this just from the fact that you need the extra verbiage in the > protocol specification in this KIP and in KIP-98 to describe the > dependence > between the fields and the attributes. It seems like a slippery slope > if > you start allowing different request types to implement the protocol > specification differently. > > You can also argue that the messages already are and are likely to > remain a > special case. For example, there is currently no generality in how > compressed message sets are represented that would be applicable for > other > request types. Some might see this divergence as an unfortunate > protocol > deficiency which should be fixed; others might see it as sort of the > inevitability of needing to optimize where it counts most. I'm probably > somewhere in between, but I think we probably all share the intuition > that > the protocol should be kept as consistent as possible. With that in > mind, > here are a few comments: > > 1. One thing I found a little odd when reading the current proposal is > that > the headers are both represented as an array of bytes and as an array > of > key/value pairs. I'd probably suggest something like this: > > Headers => [HeaderKey HeaderValue] > HeaderKey => String > HeaderValue => Bytes > > An array in the Kafka protocol is represented as a 4-byte integer > i
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi Jason, On point 1) in the message protocol the headers are simply a byte array, as like the key or value, this is to clearly demarcate the header in the core message. Then the header byte array in the core message is an array of key, value pairs. This is what it is denoting. Then this would be I guess in the given notation: Headers => [KeyLength, Key, ValueLength, Value] KeyLength => int32 <-NEW size of the byte[] of the serialised key value Key => bytes <-- NEW serialised string (UTF8) bytes of the header key ValueLength => int32 <-- NEW size of the byte[] of the serialised header value Value => bytes < NEW serialised form of the header value The key length and value length is matching the way the protocol is defined in the core message currently. On point 2) Var sized ints, this was discussed much earlier on, in fact I had suggested it myself (with Hadoop references), the complexity of this compared to having a simpler protocol was argued and agreed it wasn’t worth the complexity as all other clients in other languages would need to ensure theyre using the right var size algorithm, as there is a few. On point 3) We did the attributes, optional approach as originally there was marked concern that headers would cause a message size overhead for others, who don’t want them. As such this is the clean solution to achieve that. If that no longer holds, and we don’t care that we add 4bytes overhead, then im happy to remove. I’m personally in favour of keeping the message as small as possible so people don’t get shocks in perf and throughputs dues to message size, unless they actively use the feature, as such I do prefer the attribute bit wise feature flag approach myself. On 16/02/2017, 05:40, "Jason Gustafson" wrote: We have proposed a few significant changes to the message format in KIP-98 which now seems likely to pass (perhaps with some iterations on implementation details). It would be good to try and coordinate the changes in both of the proposals to make sure they are consistent and compatible. I think using the attributes to indicate null headers is a reasonable approach. We have proposed to do the same thing for the message key and value. That said, I sympathize with Jay's argument. Having multiple ways to specify a null value increases the overall complexity of the protocol. You can see this just from the fact that you need the extra verbiage in the protocol specification in this KIP and in KIP-98 to describe the dependence between the fields and the attributes. It seems like a slippery slope if you start allowing different request types to implement the protocol specification differently. You can also argue that the messages already are and are likely to remain a special case. For example, there is currently no generality in how compressed message sets are represented that would be applicable for other request types. Some might see this divergence as an unfortunate protocol deficiency which should be fixed; others might see it as sort of the inevitability of needing to optimize where it counts most. I'm probably somewhere in between, but I think we probably all share the intuition that the protocol should be kept as consistent as possible. With that in mind, here are a few comments: 1. One thing I found a little odd when reading the current proposal is that the headers are both represented as an array of bytes and as an array of key/value pairs. I'd probably suggest something like this: Headers => [HeaderKey HeaderValue] HeaderKey => String HeaderValue => Bytes An array in the Kafka protocol is represented as a 4-byte integer indicating the number of elements in the array followed by the serialization of the elements. Unless I'm misunderstanding, what you have instead is the total size of the headers in bytes followed by the elements. I'm not sure I see any reason for this inconsistency. 2. In KIP-98, we've introduced variable-length integer fields. Effectively, we've enriched (or "complicated" as Jay might say ;) the protocol specification to include the following types: VarInt, VarLong, UnsignedVarInt and UnsignedVarLong. Along with these primitives, we could introduce the following types: VarSizeArray => NumberOfItems Item1 Item2 .. ItemN NumberOfItems => UnsignedVarInt VarSizeNullableArray => NumberOfItemsOrNull Item1 Item2 .. ItemN NumberOfItemsOrNull => VarInt (-1 means null) And similarly for the `String` and `Bytes` types. These types can save a considerable amount of space in this proposal because they can be used for both the number of headers included in the message and the lengths of the header keys and values. We could do this instead: Headers => VarSizeArray[HeaderKey
Re: [DISCUSS] KIP-82 - Add Record Headers
We have proposed a few significant changes to the message format in KIP-98 which now seems likely to pass (perhaps with some iterations on implementation details). It would be good to try and coordinate the changes in both of the proposals to make sure they are consistent and compatible. I think using the attributes to indicate null headers is a reasonable approach. We have proposed to do the same thing for the message key and value. That said, I sympathize with Jay's argument. Having multiple ways to specify a null value increases the overall complexity of the protocol. You can see this just from the fact that you need the extra verbiage in the protocol specification in this KIP and in KIP-98 to describe the dependence between the fields and the attributes. It seems like a slippery slope if you start allowing different request types to implement the protocol specification differently. You can also argue that the messages already are and are likely to remain a special case. For example, there is currently no generality in how compressed message sets are represented that would be applicable for other request types. Some might see this divergence as an unfortunate protocol deficiency which should be fixed; others might see it as sort of the inevitability of needing to optimize where it counts most. I'm probably somewhere in between, but I think we probably all share the intuition that the protocol should be kept as consistent as possible. With that in mind, here are a few comments: 1. One thing I found a little odd when reading the current proposal is that the headers are both represented as an array of bytes and as an array of key/value pairs. I'd probably suggest something like this: Headers => [HeaderKey HeaderValue] HeaderKey => String HeaderValue => Bytes An array in the Kafka protocol is represented as a 4-byte integer indicating the number of elements in the array followed by the serialization of the elements. Unless I'm misunderstanding, what you have instead is the total size of the headers in bytes followed by the elements. I'm not sure I see any reason for this inconsistency. 2. In KIP-98, we've introduced variable-length integer fields. Effectively, we've enriched (or "complicated" as Jay might say ;) the protocol specification to include the following types: VarInt, VarLong, UnsignedVarInt and UnsignedVarLong. Along with these primitives, we could introduce the following types: VarSizeArray => NumberOfItems Item1 Item2 .. ItemN NumberOfItems => UnsignedVarInt VarSizeNullableArray => NumberOfItemsOrNull Item1 Item2 .. ItemN NumberOfItemsOrNull => VarInt (-1 means null) And similarly for the `String` and `Bytes` types. These types can save a considerable amount of space in this proposal because they can be used for both the number of headers included in the message and the lengths of the header keys and values. We could do this instead: Headers => VarSizeArray[HeaderKey HeaderValue] HeaderKey => VarSizeString HeaderValue => VarSizeBytes Combining the savings from the use of variable length fields, the benefit of using the attributes to represent null seems pretty small. 3. Whichever way we go (whether we use the attributes or not), we should at least be consistent between this KIP and KIP-98. It would be very strange to have two ways to represent null values in the same schema. Either way is OK with me. I think some message-level optimizations are justifiable, but the savings here seem minimal (a few bytes per message), so maybe it's not worth the cost of letting the message diverge even further from the rest of the protocol. -Jason On Wed, Feb 15, 2017 at 8:52 AM, radai wrote: > I've trimmed the inline contents as this mail is getting too big for the > apache mailing list software to deliver :-( > > 1. the important thing for interoperability is for different "interested > parties" (plugins, infra layers/wrappers, user-code) to be able to stick > pieces of metadata onto msgs without getting in each other's way. a common > key scheme (Strings, as of the time of this writing?) is all thats required > for that. it is assumed that the other end interested in any such piece of > metadata knows the encoding, and byte[] provides for the most flexibility. > i believe this is the same logic behind core kafka being byte[]/byte[] - > Strings are more "usable" but bytes are flexible and so were chosen. > Also - core kafka doesnt even do that good of a job on usability of the > payload (example - i have to specify the nop byte[] "decoders" explicitly > in conf), and again sacrificies usability for the sake of performance (no > convenient single-record processing as poll is a batch, lots of obscure > little config details exposing internals of the batching mechanism, etc) > > this is also why i really dislike the idea of a "type system" for header > values, it further degrades the usability, adds complexity and will > eventually get in people's way, also, it would be the 2nd/3rd hom
Re: [DISCUSS] KIP-82 - Add Record Headers
I've trimmed the inline contents as this mail is getting too big for the apache mailing list software to deliver :-( 1. the important thing for interoperability is for different "interested parties" (plugins, infra layers/wrappers, user-code) to be able to stick pieces of metadata onto msgs without getting in each other's way. a common key scheme (Strings, as of the time of this writing?) is all thats required for that. it is assumed that the other end interested in any such piece of metadata knows the encoding, and byte[] provides for the most flexibility. i believe this is the same logic behind core kafka being byte[]/byte[] - Strings are more "usable" but bytes are flexible and so were chosen. Also - core kafka doesnt even do that good of a job on usability of the payload (example - i have to specify the nop byte[] "decoders" explicitly in conf), and again sacrificies usability for the sake of performance (no convenient single-record processing as poll is a batch, lots of obscure little config details exposing internals of the batching mechanism, etc) this is also why i really dislike the idea of a "type system" for header values, it further degrades the usability, adds complexity and will eventually get in people's way, also, it would be the 2nd/3rd home-group serialization mechanism in core kafka (counting 2 iterations of the "type definition DSL") 2. this is an implementation detail, and not even a very "user facing" one? to the best of my understanding the vote process is on proposed API/behaviour. also - since we're willing to go with strings just serialize a 0-sized header blob and IIUC you dont need any optionals anymore. 3. yes, we can :-) On Tue, Feb 14, 2017 at 11:56 PM, Michael Pearce wrote: > Hi Jay, > > 1) There was some initial debate on the value part, as youll note String, > String headers were discounted early on. The reason for this is flexibility > and keeping in line with the flexibility of key, value of the message > object itself. I don’t think it takes away from an ecosystem as each plugin > will care for their own key, this way ints, booleans , exotic custom binary > can all be catered for=. > a. If you really wanted to push for a typed value interface, I wouldn’t > want just String values supported, but the the primatives plus string and > also still keeping the ability to have a binary for custom binaries that > some organisations may have. > i. I have written this slight alternative here, https://cwiki.apache.org/ > confluence/display/KAFKA/KIP-82+-+Add+Record+Headers+-+Typed > ii. Essentially the value bytes, has a leading byte overhead. > 1. This tells you what type the value is, before reading the rest of the > bytes, allowing serialisation/deserialization to and from the primitives, > string and byte[]. This is akin to some other messaging systems. > 2) We are making it optional, so that for those not wanting headers have 0 > bytes overhead (think of it as a feature flag), I don’t think this is > complex, especially if comparing to changes proposed in other kips like > kip-98. > a. If you really really don’t like this, we can drop it, but it would mean > buying into 4 bytes extra overhead for users who do not want to use headers. > 3) In the summary yes, it is at a higher level, but I think this is well > documented in the proposed changes section. > a. Added getHeaders method to Producer/Consumer record (that is it) > b. We’ve also detailed the new Headers class that this method returns that > encapsulates the headers protocol and logic. > > Best, > Mike > > ==Original questions from the vote thread from Jay.== > > Couple of things I think we still need to work out: > >1. I think we agree about the key, but I think we haven't talked about >the value yet. I think if our goal is an open ecosystem of these header >spread across many plugins from many systems we should consider making > this >a string as well so it can be printed, set via a UI, set in config, etc. >Basically encouraging pluggable serialization formats here will lead to > a >bit of a tower of babel. >2. This proposal still includes a pretty big change to our serialization >and protocol definition layer. Essentially it is introducing an optional >type, where the format is data dependent. I think this is actually a big >change though it doesn't seem like it. It means you can no longer > specify >this type with our type definition DSL, and likewise it requires custom >handling in client libs. This isn't a huge thing, since the Record >definition is custom anyway, but I think this kind of protocol >inconsistency is very non-desirable and ties you to hand-coding things. > I >think the type should instead by [Key Value] in our BNF, where key and >value are both short strings as used elsewhere. This brings it in line > with >the rest of the protocol. >3. Could we get more specific about the exact Java API change to >ProducerRecord, Co
Re: [DISCUSS] KIP-82 - Add Record Headers
while HTTP-style (string, string) are the most common and most familiar, there is a very significant impact on msg size, especially given that some payloads are literally a few integers (think stock quotes) and would be dwarfed by an http-like header segment. I think we're ok with not allowing for repeatable headers because: 1. either theyre set by multiple parties that are clashing by mistake - in this case downstream parsers will fail no matter what we do as the payloads in those headers are presumably different 2. or theyre set by the same "component"/system/plugin in which case the owner of the code can read-modify-write the same single header. On Tue, Jan 10, 2017 at 10:38 PM, Ewen Cheslack-Postava wrote: > not necessarily advocating for supporting this, just suggesting that we > think about it. As you add these features and get closer to mapping to > other systems, people will inevitably try to map them. Headers are an area > where, if we're going to add them, it's worth considering compatibility as > someone will inevitably come and complain that system X does Y with headers > and we should also support Y because any decent system that provides > headers will do so. >
Re: [DISCUSS] KIP-82 - Add Record Headers
The details about headers for control messages are still to define. But yes, the idea is to have some common default behavior that clients would need to implement. The point is, that "regular headers" add meta data to regular messages. Thus, those messages will be returned to the user via .poll(). And after the message is received the user can check if meta data is present and read it. For control messages, we do not want those to pop up via .poll() as those are no regular messages. A client would need to opt-in to see those messages (either via poll() or maybe a callback). Thus, we need some special (standardized) header IDs that indicate control messages that should not be returned to the user via poll() by default. -Matthias On 12/17/16 9:37 PM, Roger Hoover wrote: > Matthias, > > Thanks for your input. I'm +1 on control messages as they seem to be the > simplest way to implement watermarks ( > https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), a > feature that would add a lot of value to Kafka Streams IMHO. > > Your argument that the control-message indicator belongs in the client-only > section of the record format make sense. Just to make sure I understand, > are you suggesting that control messages would be indicated by a standard > reserved header? By standard, I mean that ALL Kafka consumers would know > to handle these messages differently (possibly just ignoring them). This > would need to be added to the specification of the consumer protocol so > that all Kafka clients implement it, right? I think it's a good idea but > just checking. > > Cheers, > > Roger > > > On Wed, Dec 14, 2016 at 9:51 AM, Matthias J. Sax > wrote: > >> Yes and no. I did overload the term "control message". >> >> EOS control messages are for client-broker communication and thus never >> exposed to any application. And I think this is a good design because >> broker needs to understand those control messages. Thus, this should be >> a protocol change. >> >> The type of control messages I have in mind are for client-client >> (application-application) communication and the broker is agnostic to >> them. Thus, it should not be a protocol change. >> >> >> -Matthias >> >> >> >> On 12/14/16 9:42 AM, radai wrote: >>> arent control messages getting pushed as their own top level protocol >>> change (and a fairly massive one) for the transactions KIP ? >>> >>> On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax >>> wrote: >>> Hi, I want to add a completely new angle to this discussion. For this, I want to propose an extension for the headers feature that enables new uses cases -- and those new use cases might convince people to support headers (of course including the larger scoped proposal). Extended Proposal: Allow messages with a certain header key to be special "control messages" (w/ o w/o payload) that are not exposed to an application via .poll(). Thus, a consumer client would automatically skip over those messages. If an application knows about embedded control messages, it can "sing up" to those messages by the consumer client and either get a callback or the consumer auto-drop for this messages gets disabled (allowing to consumer those messages via poll()). (The details need further considerations/discussion. I just want to sketch the main idea.) Usage: There is a shared topic (ie, used by multiple applications) and a producer application wants to embed a special message in the topic for a dedicated consumer application. Because only one application will understand this message, it cannot be a regular message as this would break all applications that do not understand this message. The producer application would set a special metadata key and no consumer application would see this control message by default because they did not enable their consumer client to return this message in poll() (and the client would just drop this message with special metadata key). Only the single application that should receive this message, will subscribe to this message on its consumer client and process it. Concrete Use Case: Kafka Streams In Kafka Streams, we would like to propagate "control messages" from subtopology to subtopology. There are multiple scenarios for which this would be useful. For example, currently we do not guarantee a "consistent shutdown" of an application. By this, I mean that input records might not be completely processed by the whole topology because the application shutdown happens "in between" and an intermediate result topic gets "stock" in an intermediate topic. Thus, a user would see an committed offset of the source topic of the application, but no corresponding result record in the output topic. Having "shutdown markers" would all
Re: [DISCUSS] KIP-82 - Add Record Headers
Matthias, Thanks for your input. I'm +1 on control messages as they seem to be the simplest way to implement watermarks ( https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), a feature that would add a lot of value to Kafka Streams IMHO. Your argument that the control-message indicator belongs in the client-only section of the record format make sense. Just to make sure I understand, are you suggesting that control messages would be indicated by a standard reserved header? By standard, I mean that ALL Kafka consumers would know to handle these messages differently (possibly just ignoring them). This would need to be added to the specification of the consumer protocol so that all Kafka clients implement it, right? I think it's a good idea but just checking. Cheers, Roger On Wed, Dec 14, 2016 at 9:51 AM, Matthias J. Sax wrote: > Yes and no. I did overload the term "control message". > > EOS control messages are for client-broker communication and thus never > exposed to any application. And I think this is a good design because > broker needs to understand those control messages. Thus, this should be > a protocol change. > > The type of control messages I have in mind are for client-client > (application-application) communication and the broker is agnostic to > them. Thus, it should not be a protocol change. > > > -Matthias > > > > On 12/14/16 9:42 AM, radai wrote: > > arent control messages getting pushed as their own top level protocol > > change (and a fairly massive one) for the transactions KIP ? > > > > On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax > > wrote: > > > >> Hi, > >> > >> I want to add a completely new angle to this discussion. For this, I > >> want to propose an extension for the headers feature that enables new > >> uses cases -- and those new use cases might convince people to support > >> headers (of course including the larger scoped proposal). > >> > >> Extended Proposal: > >> > >> Allow messages with a certain header key to be special "control > >> messages" (w/ o w/o payload) that are not exposed to an application via > >> .poll(). > >> > >> Thus, a consumer client would automatically skip over those messages. If > >> an application knows about embedded control messages, it can "sing up" > >> to those messages by the consumer client and either get a callback or > >> the consumer auto-drop for this messages gets disabled (allowing to > >> consumer those messages via poll()). > >> > >> (The details need further considerations/discussion. I just want to > >> sketch the main idea.) > >> > >> Usage: > >> > >> There is a shared topic (ie, used by multiple applications) and a > >> producer application wants to embed a special message in the topic for a > >> dedicated consumer application. Because only one application will > >> understand this message, it cannot be a regular message as this would > >> break all applications that do not understand this message. The producer > >> application would set a special metadata key and no consumer application > >> would see this control message by default because they did not enable > >> their consumer client to return this message in poll() (and the client > >> would just drop this message with special metadata key). Only the single > >> application that should receive this message, will subscribe to this > >> message on its consumer client and process it. > >> > >> > >> Concrete Use Case: Kafka Streams > >> > >> In Kafka Streams, we would like to propagate "control messages" from > >> subtopology to subtopology. There are multiple scenarios for which this > >> would be useful. For example, currently we do not guarantee a > >> "consistent shutdown" of an application. By this, I mean that input > >> records might not be completely processed by the whole topology because > >> the application shutdown happens "in between" and an intermediate result > >> topic gets "stock" in an intermediate topic. Thus, a user would see an > >> committed offset of the source topic of the application, but no > >> corresponding result record in the output topic. > >> > >> Having "shutdown markers" would allow us, to first stop the upstream > >> subtopology and write this marker into the intermediate topic and the > >> downstream subtopology would only shut down itself after is sees the > >> "shutdown marker". Thus, we can guarantee on shutdown, that no > >> "in-flight" messages got stuck in intermediate topics. > >> > >> > >> A similar usage would be for KIP-95 (Incremental Batch Processing). > >> There was a discussion about the proposed metadata topic, and we could > >> avoid this metadata topic if we would have "control messages". > >> > >> > >> Right now, we cannot insert an "application control message" because > >> Kafka Streams does not own all topics it read/writes and thus might > >> break other consumer application (as described above) if we inject > >> random messages that are not understood by other apps. > >> > >> > >> Of cour
Re: [DISCUSS] KIP-82 - Add Record Headers
@Matthias - oh. I think over the course of this thread enough use cases have been presented for things that can be done/solved with headers that even if every single potential use case has a better custom implementation (which I dont believe) headers are clearly one of the best possible kafka modifications in terms of "bang for your buck"/ROI On Thu, Dec 15, 2016 at 5:08 PM, Jun Rao wrote: > Hi, Michael, > > Thanks for the response. > > 100. Is there any other metadata associated with the uuid that APM sends to > the central coordinator? What kind of things could you do once the tracing > is embedded in each message? > > 103. How do you preserve the per key ordering when switching to a different > DC at IG? Are you doing 2-way mirroring? > > 105. Got it. So, you don't need to use headers for encryption itself. But > if there is another use case for headers, it's hard to put that info into > the encrypted payload. > > 106. Embedding all metadata instead of just the producer id per message is > likely more verbose, right? Similarly, in 100 above, only a uuid is > embedded in each message. > > 107. Yes, this kind of UUID is proposed KIP-98 for deduping. > > Jun > > On Thu, Dec 8, 2016 at 12:12 AM, Michael Pearce > wrote: > > > Hi Jun > > > > 100) each time a transaction exits a jvm for a remote system (HTTP/JMS/ > > Hopefully one day kafka) the APM tools stich in a unique id (though I > > believe it contains the end2end uuid embedded in this id), on receiving > the > > message at the receiving JVM the apm code takes this out, and continues > its > > tracing on the that new thread. Both JVM’s (and other languages the APM > > tool supports) send this data async back to the central controllers where > > the stiching togeather occurs. For this they need some header space for > > them to put this id. > > > > 101) Yes indeed we have a business transaction Id in the payload. Though > > this is a system level tracing, that we need to have marry up. Also as > per > > note on end2end encryption we’d be unable to prove the flow if the > payload > > is encrypted as we’d not have access to this at certain points of the > flow > > through the infrastructure/platform. > > > > > > 103) As said we use this mechanism in IG very successfully, as stated per > > key we guarantee the transaction producing app to handle the transaction > of > > a key at one DC unless at point of critical failure where we have to flip > > processing to another. We care about key ordering. > > I disagree on the offset comment for the partition solution unless you do > > full ISR, or expensive full XA transactions even with partitions you > cannot > > fully guarantee offsets would match. > > > > 105) Very much so, I need to have access at the platform level to the > > other meta data all mentioned, without having to need to have access to > the > > encryption keys of the payload. > > > > 106) > > Techincally yes for AZ/Region/Cluster, but then we’d need to have a > global > > producerId register which would be very hard to enforce/ensure is current > > and correct, just to understand the message origins of its > > region/az/cluster for routing. > > The client wrapper version, producerId can be the same, as obviously the > > producer could upgrade its wrapper, as such we need to know what wrapper > > version the message is created with. > > Likewise the IP address, as stated we can have our producer move, where > > its IP would change. > > > > 107) > > UUID is set on the message by interceptors before actual producer > > transport send. This is for platform level message dedupe guarantee, the > > business payload should be agnostic to this. Please see > > https://activemq.apache.org/artemis/docs/1.5.0/duplicate-detection.html > > note this is not touching business payloads. > > > > > > > > On 06/12/2016, 18:22, "Jun Rao" wrote: > > > > Hi, Michael, > > > > Thanks for the reply. I find it very helpful. > > > > Data lineage: > > 100. I'd like to understand the APM use case a bit more. It sounds > like > > that those APM plugins can generate a transaction id that we could > > potentially put in the header of every message. How would you > typically > > make use of such transaction ids? Are there other metadata associated > > with > > the transaction id and if so, how are they propagated downstream? > > > > 101. For the finance use case, if the concept of transaction is > > important, > > wouldn't it be typically included in the message payload instead of > as > > an > > optional header field? > > > > 102. The data lineage that Altas and Navigator support seems to be at > > the > > dataset level, not per record level? So, not sure if per message > > headers > > are relevant there. > > > > Mirroring: > > 103. The benefit of using separate partitions is that it potentially > > makes > > it easy to preserve offsets during mirroring. This will make it > easier > > for > > consumer t
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi, Michael, Thanks for the response. 100. Is there any other metadata associated with the uuid that APM sends to the central coordinator? What kind of things could you do once the tracing is embedded in each message? 103. How do you preserve the per key ordering when switching to a different DC at IG? Are you doing 2-way mirroring? 105. Got it. So, you don't need to use headers for encryption itself. But if there is another use case for headers, it's hard to put that info into the encrypted payload. 106. Embedding all metadata instead of just the producer id per message is likely more verbose, right? Similarly, in 100 above, only a uuid is embedded in each message. 107. Yes, this kind of UUID is proposed KIP-98 for deduping. Jun On Thu, Dec 8, 2016 at 12:12 AM, Michael Pearce wrote: > Hi Jun > > 100) each time a transaction exits a jvm for a remote system (HTTP/JMS/ > Hopefully one day kafka) the APM tools stich in a unique id (though I > believe it contains the end2end uuid embedded in this id), on receiving the > message at the receiving JVM the apm code takes this out, and continues its > tracing on the that new thread. Both JVM’s (and other languages the APM > tool supports) send this data async back to the central controllers where > the stiching togeather occurs. For this they need some header space for > them to put this id. > > 101) Yes indeed we have a business transaction Id in the payload. Though > this is a system level tracing, that we need to have marry up. Also as per > note on end2end encryption we’d be unable to prove the flow if the payload > is encrypted as we’d not have access to this at certain points of the flow > through the infrastructure/platform. > > > 103) As said we use this mechanism in IG very successfully, as stated per > key we guarantee the transaction producing app to handle the transaction of > a key at one DC unless at point of critical failure where we have to flip > processing to another. We care about key ordering. > I disagree on the offset comment for the partition solution unless you do > full ISR, or expensive full XA transactions even with partitions you cannot > fully guarantee offsets would match. > > 105) Very much so, I need to have access at the platform level to the > other meta data all mentioned, without having to need to have access to the > encryption keys of the payload. > > 106) > Techincally yes for AZ/Region/Cluster, but then we’d need to have a global > producerId register which would be very hard to enforce/ensure is current > and correct, just to understand the message origins of its > region/az/cluster for routing. > The client wrapper version, producerId can be the same, as obviously the > producer could upgrade its wrapper, as such we need to know what wrapper > version the message is created with. > Likewise the IP address, as stated we can have our producer move, where > its IP would change. > > 107) > UUID is set on the message by interceptors before actual producer > transport send. This is for platform level message dedupe guarantee, the > business payload should be agnostic to this. Please see > https://activemq.apache.org/artemis/docs/1.5.0/duplicate-detection.html > note this is not touching business payloads. > > > > On 06/12/2016, 18:22, "Jun Rao" wrote: > > Hi, Michael, > > Thanks for the reply. I find it very helpful. > > Data lineage: > 100. I'd like to understand the APM use case a bit more. It sounds like > that those APM plugins can generate a transaction id that we could > potentially put in the header of every message. How would you typically > make use of such transaction ids? Are there other metadata associated > with > the transaction id and if so, how are they propagated downstream? > > 101. For the finance use case, if the concept of transaction is > important, > wouldn't it be typically included in the message payload instead of as > an > optional header field? > > 102. The data lineage that Altas and Navigator support seems to be at > the > dataset level, not per record level? So, not sure if per message > headers > are relevant there. > > Mirroring: > 103. The benefit of using separate partitions is that it potentially > makes > it easy to preserve offsets during mirroring. This will make it easier > for > consumer to switch clusters. Currently, the consumers can switch > clusters > by using the timestampToOffset() api, but it has to deal with > duplicates. > Good point on the issue with log compact and I am not sure how to > address > this. However, even if we mirror into the existing partitions, the > ordering > for messages generated from different clusters seems non-deterministic > anyway. So, it seems that the consumers already have to deal with > that? If > a topic is compacted, does that mean which messages are preserved is > also > non-deterministic across clusters? > > 104. Good point on partit
Re: [DISCUSS] KIP-82 - Add Record Headers
Yes and no. I did overload the term "control message". EOS control messages are for client-broker communication and thus never exposed to any application. And I think this is a good design because broker needs to understand those control messages. Thus, this should be a protocol change. The type of control messages I have in mind are for client-client (application-application) communication and the broker is agnostic to them. Thus, it should not be a protocol change. -Matthias On 12/14/16 9:42 AM, radai wrote: > arent control messages getting pushed as their own top level protocol > change (and a fairly massive one) for the transactions KIP ? > > On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax > wrote: > >> Hi, >> >> I want to add a completely new angle to this discussion. For this, I >> want to propose an extension for the headers feature that enables new >> uses cases -- and those new use cases might convince people to support >> headers (of course including the larger scoped proposal). >> >> Extended Proposal: >> >> Allow messages with a certain header key to be special "control >> messages" (w/ o w/o payload) that are not exposed to an application via >> .poll(). >> >> Thus, a consumer client would automatically skip over those messages. If >> an application knows about embedded control messages, it can "sing up" >> to those messages by the consumer client and either get a callback or >> the consumer auto-drop for this messages gets disabled (allowing to >> consumer those messages via poll()). >> >> (The details need further considerations/discussion. I just want to >> sketch the main idea.) >> >> Usage: >> >> There is a shared topic (ie, used by multiple applications) and a >> producer application wants to embed a special message in the topic for a >> dedicated consumer application. Because only one application will >> understand this message, it cannot be a regular message as this would >> break all applications that do not understand this message. The producer >> application would set a special metadata key and no consumer application >> would see this control message by default because they did not enable >> their consumer client to return this message in poll() (and the client >> would just drop this message with special metadata key). Only the single >> application that should receive this message, will subscribe to this >> message on its consumer client and process it. >> >> >> Concrete Use Case: Kafka Streams >> >> In Kafka Streams, we would like to propagate "control messages" from >> subtopology to subtopology. There are multiple scenarios for which this >> would be useful. For example, currently we do not guarantee a >> "consistent shutdown" of an application. By this, I mean that input >> records might not be completely processed by the whole topology because >> the application shutdown happens "in between" and an intermediate result >> topic gets "stock" in an intermediate topic. Thus, a user would see an >> committed offset of the source topic of the application, but no >> corresponding result record in the output topic. >> >> Having "shutdown markers" would allow us, to first stop the upstream >> subtopology and write this marker into the intermediate topic and the >> downstream subtopology would only shut down itself after is sees the >> "shutdown marker". Thus, we can guarantee on shutdown, that no >> "in-flight" messages got stuck in intermediate topics. >> >> >> A similar usage would be for KIP-95 (Incremental Batch Processing). >> There was a discussion about the proposed metadata topic, and we could >> avoid this metadata topic if we would have "control messages". >> >> >> Right now, we cannot insert an "application control message" because >> Kafka Streams does not own all topics it read/writes and thus might >> break other consumer application (as described above) if we inject >> random messages that are not understood by other apps. >> >> >> Of course, one can work around "embedded control messaged" by using an >> additional topic to propagate control messaged between application (as >> suggestion in KIP-95 via a metadata topic for Kafka Streams). But there >> are major concerns about adding this metadata topic in the KIP and this >> shows that other application that need a similar pattern might profit >> from topic embedded "control messages", too. >> >> >> One last important consideration: those "control messages" are used for >> client to client communication and are not understood by the broker. >> Thus, those messages should not be enabled within the message format >> (c.f. tombstone flag -- KIP-87). However, "client land" record headers >> would be a nice way to implement them. Because KIP-82 did consider key >> namespaces for metatdata keys, this extension should not be an own KIP >> but should be included in KIP-82 to reserve a namespace for "control >> message" in the first place. >> >> >> Sorry for the long email... Looking forward to your feedback. >> >> >>
Re: [DISCUSS] KIP-82 - Add Record Headers
arent control messages getting pushed as their own top level protocol change (and a fairly massive one) for the transactions KIP ? On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax wrote: > Hi, > > I want to add a completely new angle to this discussion. For this, I > want to propose an extension for the headers feature that enables new > uses cases -- and those new use cases might convince people to support > headers (of course including the larger scoped proposal). > > Extended Proposal: > > Allow messages with a certain header key to be special "control > messages" (w/ o w/o payload) that are not exposed to an application via > .poll(). > > Thus, a consumer client would automatically skip over those messages. If > an application knows about embedded control messages, it can "sing up" > to those messages by the consumer client and either get a callback or > the consumer auto-drop for this messages gets disabled (allowing to > consumer those messages via poll()). > > (The details need further considerations/discussion. I just want to > sketch the main idea.) > > Usage: > > There is a shared topic (ie, used by multiple applications) and a > producer application wants to embed a special message in the topic for a > dedicated consumer application. Because only one application will > understand this message, it cannot be a regular message as this would > break all applications that do not understand this message. The producer > application would set a special metadata key and no consumer application > would see this control message by default because they did not enable > their consumer client to return this message in poll() (and the client > would just drop this message with special metadata key). Only the single > application that should receive this message, will subscribe to this > message on its consumer client and process it. > > > Concrete Use Case: Kafka Streams > > In Kafka Streams, we would like to propagate "control messages" from > subtopology to subtopology. There are multiple scenarios for which this > would be useful. For example, currently we do not guarantee a > "consistent shutdown" of an application. By this, I mean that input > records might not be completely processed by the whole topology because > the application shutdown happens "in between" and an intermediate result > topic gets "stock" in an intermediate topic. Thus, a user would see an > committed offset of the source topic of the application, but no > corresponding result record in the output topic. > > Having "shutdown markers" would allow us, to first stop the upstream > subtopology and write this marker into the intermediate topic and the > downstream subtopology would only shut down itself after is sees the > "shutdown marker". Thus, we can guarantee on shutdown, that no > "in-flight" messages got stuck in intermediate topics. > > > A similar usage would be for KIP-95 (Incremental Batch Processing). > There was a discussion about the proposed metadata topic, and we could > avoid this metadata topic if we would have "control messages". > > > Right now, we cannot insert an "application control message" because > Kafka Streams does not own all topics it read/writes and thus might > break other consumer application (as described above) if we inject > random messages that are not understood by other apps. > > > Of course, one can work around "embedded control messaged" by using an > additional topic to propagate control messaged between application (as > suggestion in KIP-95 via a metadata topic for Kafka Streams). But there > are major concerns about adding this metadata topic in the KIP and this > shows that other application that need a similar pattern might profit > from topic embedded "control messages", too. > > > One last important consideration: those "control messages" are used for > client to client communication and are not understood by the broker. > Thus, those messages should not be enabled within the message format > (c.f. tombstone flag -- KIP-87). However, "client land" record headers > would be a nice way to implement them. Because KIP-82 did consider key > namespaces for metatdata keys, this extension should not be an own KIP > but should be included in KIP-82 to reserve a namespace for "control > message" in the first place. > > > Sorry for the long email... Looking forward to your feedback. > > > -Matthias > > > > > > > > > > On 12/8/16 12:12 AM, Michael Pearce wrote: > > Hi Jun > > > > 100) each time a transaction exits a jvm for a remote system (HTTP/JMS/ > Hopefully one day kafka) the APM tools stich in a unique id (though I > believe it contains the end2end uuid embedded in this id), on receiving the > message at the receiving JVM the apm code takes this out, and continues its > tracing on the that new thread. Both JVM’s (and other languages the APM > tool supports) send this data async back to the central controllers where > the stiching togeather occurs. For this they need some header space for > them
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi, I want to add a completely new angle to this discussion. For this, I want to propose an extension for the headers feature that enables new uses cases -- and those new use cases might convince people to support headers (of course including the larger scoped proposal). Extended Proposal: Allow messages with a certain header key to be special "control messages" (w/ o w/o payload) that are not exposed to an application via .poll(). Thus, a consumer client would automatically skip over those messages. If an application knows about embedded control messages, it can "sing up" to those messages by the consumer client and either get a callback or the consumer auto-drop for this messages gets disabled (allowing to consumer those messages via poll()). (The details need further considerations/discussion. I just want to sketch the main idea.) Usage: There is a shared topic (ie, used by multiple applications) and a producer application wants to embed a special message in the topic for a dedicated consumer application. Because only one application will understand this message, it cannot be a regular message as this would break all applications that do not understand this message. The producer application would set a special metadata key and no consumer application would see this control message by default because they did not enable their consumer client to return this message in poll() (and the client would just drop this message with special metadata key). Only the single application that should receive this message, will subscribe to this message on its consumer client and process it. Concrete Use Case: Kafka Streams In Kafka Streams, we would like to propagate "control messages" from subtopology to subtopology. There are multiple scenarios for which this would be useful. For example, currently we do not guarantee a "consistent shutdown" of an application. By this, I mean that input records might not be completely processed by the whole topology because the application shutdown happens "in between" and an intermediate result topic gets "stock" in an intermediate topic. Thus, a user would see an committed offset of the source topic of the application, but no corresponding result record in the output topic. Having "shutdown markers" would allow us, to first stop the upstream subtopology and write this marker into the intermediate topic and the downstream subtopology would only shut down itself after is sees the "shutdown marker". Thus, we can guarantee on shutdown, that no "in-flight" messages got stuck in intermediate topics. A similar usage would be for KIP-95 (Incremental Batch Processing). There was a discussion about the proposed metadata topic, and we could avoid this metadata topic if we would have "control messages". Right now, we cannot insert an "application control message" because Kafka Streams does not own all topics it read/writes and thus might break other consumer application (as described above) if we inject random messages that are not understood by other apps. Of course, one can work around "embedded control messaged" by using an additional topic to propagate control messaged between application (as suggestion in KIP-95 via a metadata topic for Kafka Streams). But there are major concerns about adding this metadata topic in the KIP and this shows that other application that need a similar pattern might profit from topic embedded "control messages", too. One last important consideration: those "control messages" are used for client to client communication and are not understood by the broker. Thus, those messages should not be enabled within the message format (c.f. tombstone flag -- KIP-87). However, "client land" record headers would be a nice way to implement them. Because KIP-82 did consider key namespaces for metatdata keys, this extension should not be an own KIP but should be included in KIP-82 to reserve a namespace for "control message" in the first place. Sorry for the long email... Looking forward to your feedback. -Matthias On 12/8/16 12:12 AM, Michael Pearce wrote: > Hi Jun > > 100) each time a transaction exits a jvm for a remote system (HTTP/JMS/ > Hopefully one day kafka) the APM tools stich in a unique id (though I believe > it contains the end2end uuid embedded in this id), on receiving the message > at the receiving JVM the apm code takes this out, and continues its tracing > on the that new thread. Both JVM’s (and other languages the APM tool > supports) send this data async back to the central controllers where the > stiching togeather occurs. For this they need some header space for them to > put this id. > > 101) Yes indeed we have a business transaction Id in the payload. Though this > is a system level tracing, that we need to have marry up. Also as per note on > end2end encryption we’d be unable to prove the flow if the payload is > encrypted as we’d not have access to this at certain points of the flow > through the infrastruct
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi, Michael, Thanks for the reply. I find it very helpful. Data lineage: 100. I'd like to understand the APM use case a bit more. It sounds like that those APM plugins can generate a transaction id that we could potentially put in the header of every message. How would you typically make use of such transaction ids? Are there other metadata associated with the transaction id and if so, how are they propagated downstream? 101. For the finance use case, if the concept of transaction is important, wouldn't it be typically included in the message payload instead of as an optional header field? 102. The data lineage that Altas and Navigator support seems to be at the dataset level, not per record level? So, not sure if per message headers are relevant there. Mirroring: 103. The benefit of using separate partitions is that it potentially makes it easy to preserve offsets during mirroring. This will make it easier for consumer to switch clusters. Currently, the consumers can switch clusters by using the timestampToOffset() api, but it has to deal with duplicates. Good point on the issue with log compact and I am not sure how to address this. However, even if we mirror into the existing partitions, the ordering for messages generated from different clusters seems non-deterministic anyway. So, it seems that the consumers already have to deal with that? If a topic is compacted, does that mean which messages are preserved is also non-deterministic across clusters? 104. Good point on partition key. End-to-end encryption: 105. So, it seems end-to-end encryption is useful. Are headers useful there? Auditing: 106. It seems other than the UUID, all other metadata are per producer? EOS: 107. How are those UUIDs generated? I am not sure if they can be generated in the producer library. An application may send messages through a load balancer and on retry, the same message could be routed to a different producer instance. So, it seems that the application has to generate the UUIDs. In that case, shouldn't the application just put the UUID in the payload? Thanks, Jun On Fri, Dec 2, 2016 at 4:57 PM, Michael Pearce wrote: > Hi Jun. > > Per Transaction Tracing / Data Lineage. > > As Stated in the KIP this has the first use case of how many APM tools now > work. > I would find it impossible for any one to argue this is not important or a > niche market as it has its own gartner report for this space. Such > companies as Appdynamics, NewRelic, Dynatrace, Hawqular are but a few. > > Likewise these APM tools can help very rapidly track down issues and > automatically capture metrics, perform actions based on unexpected behavior > to auto recover services. > > Before mentioning looking at aggregated stats, in these cases where > actually on critical flows we cannot afford to have aggregated rolled up > stats only. > > With the APM tool we use its actually able to detect a single transaction > failure and capture the thread traces in the JVM where it failed and > everything for us, to the point it sends us alerts where we have this > giving the line number of the code that caused it, the transaction trace > through all the services and endpoints (supported) upto the point of > failure, it can also capture the data in and out (so we can replay). > Because atm Kafka doesn’t support us being able to stich in these tracing > transaction ids natively, we cannot get these benefits as such is limiting > our ability support apps and monitor them to the same standards we come to > expect when on a kafka flow. > > This actually ties in with Data Lineage, as the same tracing can be used > to back stich this. Essentially many times due to the sums of money > involved there are disputes, and typically as a financial institute the > easiest and cleanest way to prove when disputes arise is to present the > actual flow and processes involved in a transaction. > > Likewise as Hadoop matures its evident this case is important, as tools > such as Atlas (Hortonworks led) and Navigator (cloudera led) are evident > also I believe the importance here is very much NOT just a financial issue. > > From a MDM point of view any company wanting to care about Data Quality > and Data Governance - Data Lineage is a key piece in this puzzle. > > > > RE Mirroring, > > As per the KIP in-fact this is exactly what we do re cluster id, to mirror > a network of clusters between AZ’s / Regions. We know a transaction for a > key will be done within a AZ/Region, as such we know the write to kafka > would be ordered per key. But we need eventual view of that across in our > other regions/az’s. When we have complete AZ or Region failure we know > there will be a brief interruption whilst those transactions are moved to > another region but we expect after it to continue. > > As mentioned having separate Partions to do this starts to get > ugly/complicated for us: > how would I do compaction where a key is in two partitions? > How do we balance consumers so where
Re: [DISCUSS] KIP-82 - Add Record Headers
Hi James, Yes exactly what we do. Alas as it's bespoke to our internal wrapper ATM so we cannot share this. This is the eco system argument that's with having native headers then we would be able to share these kind of things. Cheers Mike Sent using OWA for iPhone From: James Cheng Sent: Monday, December 5, 2016 8:50:30 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-82 - Add Record Headers > On Dec 2, 2016, at 4:57 PM, Michael Pearce wrote: > > Hi Jun. > > RE Mirroring, > > [...] > > Lastly around mirroring we have a partionKey field, as the key used for > portioning logic != compaction key all the time but we want to preserve it > for when we mirror so that if source cluster partition count != destination > cluster partition count we can honour the same partitioning logic. > > Michael, Sorry to be off topic about the KIP, but this is genius. I'm totally stealing this idea. We currently require partition counts to be the same on source and destination cluster and do 1-to-1 mapping of partitions (i.e. partition 1 goes to partition 1, partition 2 goes to partition 2, etc) because otherwise mirrormaker would partition based off of the compaction key. Explicitly including the partition key would let us work around that. Does that mean you have a custom partitioner in mirrormaker that uses the partitionKey field? Is it implemented inside a MirrorMakerMessageHandler? Thanks! -James The information contained in this email is strictly confidential and for the use of the addressee only, unless otherwise indicated. If you are not the intended recipient, please do not read, copy, use or disclose to others this message or any attachment. Please also notify the sender by replying to this email or by telephone (+44(020 7896 0011) and then delete the email and any copies of it. Opinions, conclusion (etc) that do not relate to the official business of this company shall be understood as neither given nor endorsed by it. IG is a trading name of IG Markets Limited (a company registered in England and Wales, company number 04008957) and IG Index Limited (a company registered in England and Wales, company number 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG Index Limited (register number 114059) are authorised and regulated by the Financial Conduct Authority.
Re: [DISCUSS] KIP-82 - Add Record Headers
> On Dec 2, 2016, at 4:57 PM, Michael Pearce wrote: > > Hi Jun. > > RE Mirroring, > > [...] > > Lastly around mirroring we have a partionKey field, as the key used for > portioning logic != compaction key all the time but we want to preserve it > for when we mirror so that if source cluster partition count != destination > cluster partition count we can honour the same partitioning logic. > > Michael, Sorry to be off topic about the KIP, but this is genius. I'm totally stealing this idea. We currently require partition counts to be the same on source and destination cluster and do 1-to-1 mapping of partitions (i.e. partition 1 goes to partition 1, partition 2 goes to partition 2, etc) because otherwise mirrormaker would partition based off of the compaction key. Explicitly including the partition key would let us work around that. Does that mean you have a custom partitioner in mirrormaker that uses the partitionKey field? Is it implemented inside a MirrorMakerMessageHandler? Thanks! -James
Re: [DISCUSS] KIP-82 - Add Record Headers
>>> > >> third-party use case category. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> A. content-type > > >> > >> >> >>> > >> It seems that in general, content-type should be set > at > > >> the > > >> > >> topic > > >> > >> >> >>> level. > > >> > >> >> >>> > >> Not sure if mixing messages with different content > > types > > >> > >> should be > > >> > >> >> >>> > >> encouraged. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> B. schema id > > >> > >> >> >>> > >> Since the value is mostly useless without schema id, > it > > >> > seems > > >> > >> that > > >> > >> >> >>> > storing > > >> > >> >> >>> > >> the schema id together with serialized bytes in the > > value > > >> is > > >> > >> >> better? > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> C. per message encryption > > >> > >> >> >>> > >> One drawback of this approach is that this > > significantly > > >> > reduce > > >> > >> >> the > > >> > >> >> >>> > >> effectiveness of compression, which happens on a set > of > > >> > >> serialized > > >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire > > >> > encryption > > >> > >> and > > >> > >> >> >>> rely > > >> > >> >> >>> > on > > >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest > encryption. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > > >> > >> >> >>> > >> This is actually interesting. Today, to avoid > > introducing > > >> > >> cycles > > >> > >> >> when > > >> > >> >> >>> > doing > > >> > >> >> >>> > >> mirroring across data centers, one would either have > to > > >> set > > >> > up > > >> > >> two > > >> > >> >> >>> Kafka > > >> > >> >> >>> > >> clusters (a local and an aggregate) per data center > or > > >> > rename > > >> > >> >> topics. > > >> > >> >> >>> > >> Neither is ideal. With headers, the producer could > tag > > >> each > > >> > >> >> message > > >> > >> >> >>> with > > >> > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker > > could > > >> > then > > >> > >> >> avoid > > >> > >> >> >>> > >> mirroring messages to a cluster if they are tagged > with > > >> the > > >> > >> same > > >> > >> >> >>> cluster > > >> > >> >> >>> > >> id. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> However, an alternative approach is to introduce sth > > like > > >> > >> >> >>> hierarchical > > >> > >> >> >>> > >> topic and store messages from different clusters in > > >> > different > > >> > >> >> >>> partitions > > >> > >> >> >>> > >> under the sa
Re: [DISCUSS] KIP-82 - Add Record Headers
; https://cwiki.apache.org/confluence/display/KAFKA/A+ > > >> > >> >> >>> > Case+for+Kafka+Headers > > >> > >> >> >>> > >> ). > > >> > >> >> >>> > >> The following are the ones that that I understand and > > >> could > > >> > be > > >> > >> in > > >> > >> >> the > > >> > >> >> >>> > >> third-party use case category. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> A. content-type > > >> > >> >> >>> > >> It seems that in general, content-type should be set > at > > >> the > > >> > >> topic > > >> > >> >> >>> level. > > >> > >> >> >>> > >> Not sure if mixing messages with different content > > types > > >> > >> should be > > >> > >> >> >>> > >> encouraged. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> B. schema id > > >> > >> >> >>> > >> Since the value is mostly useless without schema id, > it > > >> > seems > > >> > >> that > > >> > >> >> >>> > storing > > >> > >> >> >>> > >> the schema id together with serialized bytes in the > > value > > >> is > > >> > >> >> better? > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> C. per message encryption > > >> > >> >> >>> > >> One drawback of this approach is that this > > significantly > > >> > reduce > > >> > >> >> the > > >> > >> >> >>> > >> effectiveness of compression, which happens on a set > of > > >> > >> serialized > > >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire > > >> > encryption > > >> > >> and > > >> > >> >> >>> rely > > >> > >> >> >>> > on > > >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest > encryption. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > > >> > >> >> >>> > >> This is actually interesting. Today, to avoid > > introducing > > >> > >> cycles > > >> > >> >> when > > >> > >> >> >>> > doing > > >> > >> >> >>> > >> mirroring across data centers, one would either have > to > > >> set > > >> > up > > >> > >> two > > >> > >> >> >>> Kafka > > >> > >> >> >>> > >> clusters (a local and an aggregate) per data center > or > > >> > rename > > >> > >> >> topics. > > >> > >> >> >>> > >> Neither is ideal. With headers, the producer could > tag > > >> each > > >> > >> >> message > > >> > >> >> >>> with > > >> > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker > > could > > >> > then > > >> > >> >> avoid > > >> > >> >> >>> > >> mirroring messages to a cluster if they are tagged > with > > >> the > > >> > >> same > > >> > >> >> >>> cluster > > >> > >> >> >>> > >> id. > > >> > >> >> >>> > >> > > >> > >> >> >>> > >> However, an alternative approach is to introduce sth > > like > > >> > >> >> >>> hierarchical > > >> > >> >> >>> > >> topic and store
Re: [DISCUSS] KIP-82 - Add Record Headers
t;> went through the use cases in the KIP and in Radai's > wiki > >> ( > >> > >> >> >>> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+ > >> > >> >> >>> > Case+for+Kafka+Headers > >> > >> >> >>> > >> ). > >> > >> >> >>> > >> The following are the ones that that I understand and > >> could > >> > be > >> > >> in > >> > >> >> the > >> > >> >> >>> > >> third-party use case category. > >> > >> >> >>> > >> > >> > >> >> >>> > >> A. content-type > >> > >> >> >>> > >> It seems that in general, content-type should be set at > >> the > >> > >> topic > >> > >> >> >>> level. > >> > >> >> >>> > >> Not sure if mixing messages with different content > types > >> > >> should be > >> > >> >> >>> > >> encouraged. > >> > >> >> >>> > >> > >> > >> >> >>> > >> B. schema id > >> > >> >> >>> > >> Since the value is mostly useless without schema id, it > >> > seems > >> > >> that > >> > >> >> >>> > storing > >> > >> >> >>> > >> the schema id together with serialized bytes in the > value > >> is > >> > >> >> better? > >> > >> >> >>> > >> > >> > >> >> >>> > >> C. per message encryption > >> > >> >> >>> > >> One drawback of this approach is that this > significantly > >> > reduce > >> > >> >> the > >> > >> >> >>> > >> effectiveness of compression, which happens on a set of > >> > >> serialized > >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire > >> > encryption > >> > >> and > >> > >> >> >>> rely > >> > >> >> >>> > on > >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. > >> > >> >> >>> > >> > >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > >> > >> >> >>> > >> This is actually interesting. Today, to avoid > introducing > >> > >> cycles > >> > >> >> when > >> > >> >> >>> > doing > >> > >> >> >>> > >> mirroring across data centers, one would either have to > >> set > >> > up > >> > >> two > >> > >> >> >>> Kafka > >> > >> >> >>> > >> clusters (a local and an aggregate) per data center or > >> > rename > >> > >> >> topics. > >> > >> >> >>> > >> Neither is ideal. With headers, the producer could tag > >> each > >> > >> >> message > >> > >> >> >>> with > >> > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker > could > >> > then > >> > >> >> avoid > >> > >> >> >>> > >> mirroring messages to a cluster if they are tagged with > >> the > >> > >> same > >> > >> >> >>> cluster > >> > >> >> >>> > >> id. > >> > >> >> >>> > >> > >> > >> >> >>> > >> However, an alternative approach is to introduce sth > like > >> > >> >> >>> hierarchical > >> > >> >> >>> > >> topic and store messages from different clusters in > >> > different > >> > >> >> >>> partitions > >> > >> >> >>> > >> under the same topic. This approach avoids filtering &g
Re: [DISCUSS] KIP-82 - Add Record Headers
gt; >> >> >>> > > >> > >> >> >>> > > >> > >> >> >>> > > >> > >> >> >>> > > On Wed, Nov 30, 2016 at 6:50 PM, Jun Rao > > >> > >> wrote: >> > >> >> >>> > > >> > >> >> >>> > >> Hi, Michael, >> > >> >> >>> > >> >> > >> >> >>> > >> In order to answer the first two questions, it would be >> > helpful >> > >> >> if we >> > >> >> >>> > could >> > >> >> >>> > >> identify 1 or 2 strong use cases for headers in the space >> > for >> > >> >> >>> > third-party >> > >> >> >>> > >> vendors. For use cases within an organization, one could >> > always >> > >> >> use >> > >> >> >>> > other >> > >> >> >>> > >> approaches such as company-wise containers to get around >> w/o >> > >> >> >>> headers. I >> > >> >> >>> > >> went through the use cases in the KIP and in Radai's wiki >> ( >> > >> >> >>> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+ >> > >> >> >>> > Case+for+Kafka+Headers >> > >> >> >>> > >> ). >> > >> >> >>> > >> The following are the ones that that I understand and >> could >> > be >> > >> in >> > >> >> the >> > >> >> >>> > >> third-party use case category. >> > >> >> >>> > >> >> > >> >> >>> > >> A. content-type >> > >> >> >>> > >> It seems that in general, content-type should be set at >> the >> > >> topic >> > >> >> >>> level. >> > >> >> >>> > >> Not sure if mixing messages with different content types >> > >> should be >> > >> >> >>> > >> encouraged. >> > >> >> >>> > >> >> > >> >> >>> > >> B. schema id >> > >> >> >>> > >> Since the value is mostly useless without schema id, it >> > seems >> > >> that >> > >> >> >>> > storing >> > >> >> >>> > >> the schema id together with serialized bytes in the value >> is >> > >> >> better? >> > >> >> >>> > >> >> > >> >> >>> > >> C. per message encryption >> > >> >> >>> > >> One drawback of this approach is that this significantly >> > reduce >> > >> >> the >> > >> >> >>> > >> effectiveness of compression, which happens on a set of >> > >> serialized >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire >> > encryption >> > >> and >> > >> >> >>> rely >> > >> >> >>> > on >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. >> > >> >> >>> > >> >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters >> > >> >> >>> > >> This is actually interesting. Today, to avoid introducing >> > >> cycles >> > >> >> when >> > >> >> >>> > doing >> > >> >> >>> > >> mirroring across data centers, one would either have to >> set >> > up >> > >> two >> > >> >> >>> Kafka >> > >> >> >>> > >> clusters (a local and an aggregate) per data center or >> > rename >> > >> >> topics. >> > >> >> >>> > >> Neither is ideal. With headers, the producer could tag >> each >> > >> >> message >> > >> >> >>> with >> > >> >> >>>
Re: [DISCUSS] KIP-82 - Add Record Headers
> >> >>> > >> In order to answer the first two questions, it would be >> > helpful >> > >> >> if we >> > >> >> >>> > could >> > >> >> >>> > >> identify 1 or 2 strong use cases for headers in the space >> > for >> > >> >> >>> > third-party >> > >> >> >>> > >> vendors. For use cases within an organization, one could >> > always >> > >> >> use >> > >> >> >>> > other >> > >> >> >>> > >> approaches such as company-wise containers to get around >> w/o >> > >> >> >>> headers. I >> > >> >> >>> > >> went through the use cases in the KIP and in Radai's wiki >> ( >> > >> >> >>> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+ >> <https://cwiki.apache.org/confluence/display/KAFKA/A+> >> > >> >> >>> > Case+for+Kafka+Headers >> > >> >> >>> > >> ). >> > >> >> >>> > >> The following are the ones that that I understand and >> could >> > be >> > >> in >> > >> >> the >> > >> >> >>> > >> third-party use case category. >> > >> >> >>> > >> >> > >> >> >>> > >> A. content-type >> > >> >> >>> > >> It seems that in general, content-type should be set at >> the >> > >> topic >> > >> >> >>> level. >> > >> >> >>> > >> Not sure if mixing messages with different content types >> > >> should be >> > >> >> >>> > >> encouraged. >> > >> >> >>> > >> >> > >> >> >>> > >> B. schema id >> > >> >> >>> > >> Since the value is mostly useless without schema id, it >> > seems >> > >> that >> > >> >> >>> > storing >> > >> >> >>> > >> the schema id together with serialized bytes in the value >> is >> > >> >> better? >> > >> >> >>> > >> >> > >> >> >>> > >> C. per message encryption >> > >> >> >>> > >> One drawback of this approach is that this significantly >> > reduce >> > >> >> the >> > >> >> >>> > >> effectiveness of compression, which happens on a set of >> > >> serialized >> > >> >> >>> > >> messages. An alternative is to enable SSL for wire >> > encryption >> > >> and >> > >> >> >>> rely >> > >> >> >>> > on >> > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. >> > >> >> >>> > >> >> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters >> > >> >> >>> > >> This is actually interesting. Today, to avoid introducing >> > >> cycles >> > >> >> when >> > >> >> >>> > doing >> > >> >> >>> > >> mirroring across data centers, one would either have to >> set >> > up >> > >> two >> > >> >> >>> Kafka >> > >> >> >>> > >> clusters (a local and an aggregate) per data center or >> > rename >> > >> >> topics. >> > >> >> >>> > >> Neither is ideal. With headers, the producer could tag >> each >> > >> >> message >> > >> >> >>> with >> > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker could >> > then >> > >> >> avoid >> > >> >> >>> > >> mirroring messages to a cluster if they are tagged with >> the >> > >> same >> > >> >> >>> cluster >> > >> >> >>> > >> id. >> > >> >> >>> >
Re: [DISCUSS] KIP-82 - Add Record Headers
>> >>> > >> encouraged. > > >> >> >>> > >> > > >> >> >>> > >> B. schema id > > >> >> >>> > >> Since the value is mostly useless without schema id, it > > seems > > >> that > > >> >> >>> > storing > > >> >> >>> > >> the schema id together with serialized bytes in the value > is > > >> >> better? > > >> >> >>> > >> > > >> >> >>> > >> C. per message encryption > > >> >> >>> > >> One drawback of this approach is that this significantly > > reduce > > >> >> the > > >> >> >>> > >> effectiveness of compression, which happens on a set of > > >> serialized > > >> >> >>> > >> messages. An alternative is to enable SSL for wire > > encryption > > >> and > > >> >> >>> rely > > >> >> >>> > on > > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. > > >> >> >>> > >> > > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > > >> >> >>> > >> This is actually interesting. Today, to avoid introducing > > >> cycles > > >> >> when > > >> >> >>> > doing > > >> >> >>> > >> mirroring across data centers, one would either have to > set > > up > > >> two > > >> >> >>> Kafka > > >> >> >>> > >> clusters (a local and an aggregate) per data center or > > rename > > >> >> topics. > > >> >> >>> > >> Neither is ideal. With headers, the producer could tag > each > > >> >> message > > >> >> >>> with > > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker could > > then > > >> >> avoid > > >> >> >>> > >> mirroring messages to a cluster if they are tagged with > the > > >> same > > >> >> >>> cluster > > >> >> >>> > >> id. > > >> >> >>> > >> > > >> >> >>> > >> However, an alternative approach is to introduce sth like > > >> >> >>> hierarchical > > >> >> >>> > >> topic and store messages from different clusters in > > different > > >> >> >>> partitions > > >> >> >>> > >> under the same topic. This approach avoids filtering out > > >> unneeded > > >> >> >>> data > > >> >> >>> > and > > >> >> >>> > >> makes offset preserving easier to support. It may make > > >> compaction > > >> >> >>> > trickier > > >> >> >>> > >> though since the same key may show up in different > > partitions. > > >> >> >>> > >> > > >> >> >>> > >> E. record-level lineage > > >> >> >>> > >> For example, a source connector could store in the message > > the > > >> >> >>> metadata > > >> >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream > job > > >> >> >>> transforms > > >> >> >>> > >> messages from topic A to topic B, the library could > include > > the > > >> >> >>> source > > >> >> >>> > >> message offset in each of the transformed message in the > > >> header. > > >> >> Not > > >> >> >>> > sure > > >> >> >>> > >> how widely useful record-level lineage is though since the > > >> >> overhead > > >> >> >>> > could > > >> >> >>> > >> be significant. > > >> >> >>> > >> > > >> >> >>> > >> F. auditing metadata > > >> >> >>> > >> We co
Re: [DISCUSS] KIP-82 - Add Record Headers
;> >> >>> > >> encouraged. > > >> >> >>> > >> > > >> >> >>> > >> B. schema id > > >> >> >>> > >> Since the value is mostly useless without schema id, it > > seems > > >> that > > >> >> >>> > storing > > >> >> >>> > >> the schema id together with serialized bytes in the value > is > > >> >> better? > > >> >> >>> > >> > > >> >> >>> > >> C. per message encryption > > >> >> >>> > >> One drawback of this approach is that this significantly > > reduce > > >> >> the > > >> >> >>> > >> effectiveness of compression, which happens on a set of > > >> serialized > > >> >> >>> > >> messages. An alternative is to enable SSL for wire > > encryption > > >> and > > >> >> >>> rely > > >> >> >>> > on > > >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. > > >> >> >>> > >> > > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters > > >> >> >>> > >> This is actually interesting. Today, to avoid introducing > > >> cycles > > >> >> when > > >> >> >>> > doing > > >> >> >>> > >> mirroring across data centers, one would either have to > set > > up > > >> two > > >> >> >>> Kafka > > >> >> >>> > >> clusters (a local and an aggregate) per data center or > > rename > > >> >> topics. > > >> >> >>> > >> Neither is ideal. With headers, the producer could tag > each > > >> >> message > > >> >> >>> with > > >> >> >>> > >> the producing cluster ID in the header. MirrorMaker could > > then > > >> >> avoid > > >> >> >>> > >> mirroring messages to a cluster if they are tagged with > the > > >> same > > >> >> >>> cluster > > >> >> >>> > >> id. > > >> >> >>> > >> > > >> >> >>> > >> However, an alternative approach is to introduce sth like > > >> >> >>> hierarchical > > >> >> >>> > >> topic and store messages from different clusters in > > different > > >> >> >>> partitions > > >> >> >>> > >> under the same topic. This approach avoids filtering out > > >> unneeded > > >> >> >>> data > > >> >> >>> > and > > >> >> >>> > >> makes offset preserving easier to support. It may make > > >> compaction > > >> >> >>> > trickier > > >> >> >>> > >> though since the same key may show up in different > > partitions. > > >> >> >>> > >> > > >> >> >>> > >> E. record-level lineage > > >> >> >>> > >> For example, a source connector could store in the message > > the > > >> >> >>> metadata > > >> >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream > job > > >> >> >>> transforms > > >> >> >>> > >> messages from topic A to topic B, the library could > include > > the > > >> >> >>> source > > >> >> >>> > >> message offset in each of the transformed message in the > > >> header. > > >> >> Not > > >> >> >>> > sure > > >> >> >>> > >> how widely useful record-level lineage is though since the > > >> >> overhead > > >> >> >>> > could > > >> >> >>> > >> be significant. > > >> >> >>> > >> > > >> >> >>> > >> F. auditing metadata > > >> >> >>>
Re: [DISCUSS] KIP-82 - Add Record Headers
> >> topic and store messages from different clusters in > different > >> >> >>> partitions > >> >> >>> > >> under the same topic. This approach avoids filtering out > >> unneeded > >> >> >>> data > >> >> >>> > and > >> >> >>> > >> makes offset preserving easier to support. It may make > >> compaction > >> >> >>> > trickier > >> >> >>> > >> though since the same key may show up in different > partitions. > >> >> >>> > >> > >> >> >>> > >> E. record-level lineage > >> >> >>> > >> For example, a source connector could store in the message > the > >> >> >>> metadata > >> >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream job > >> >> >>> transforms > >> >> >>> > >> messages from topic A to topic B, the library could include > the > >> >> >>> source > >> >> >>> > >> message offset in each of the transformed message in the > >> header. > >> >> Not > >> >> >>> > sure > >> >> >>> > >> how widely useful record-level lineage is though since the > >> >> overhead > >> >> >>> > could > >> >> >>> > >> be significant. > >> >> >>> > >> > >> >> >>> > >> F. auditing metadata > >> >> >>> > >> We could put things like clientId/host/user in the header in > >> each > >> >> >>> > message > >> >> >>> > >> for auditing. These metadata are really at the producer > level > >> >> though. > >> >> >>> > So, a > >> >> >>> > >> more efficient way is to only include a "producerId" per > >> message > >> >> and > >> >> >>> > send > >> >> >>> > >> the producerId -> metadata mapping independently. KIP-98 is > >> >> actually > >> >> >>> > >> proposing including such a producerId natively in the > message. > >> >> >>> > >> > >> >> >>> > >> So, overall, I not sure that I am fully convinced of the > strong > >> >> >>> > third-party > >> >> >>> > >> use cases of headers yet. Perhaps we could discuss a bit > more > >> to > >> >> make > >> >> >>> > one > >> >> >>> > >> or two really convincing use cases. > >> >> >>> > >> > >> >> >>> > >> Another orthogonal question is whether header should be > >> exposed > >> >> in > >> >> >>> > stream > >> >> >>> > >> processing systems such Kafka stream, Samza, and Spark > >> streaming. > >> >> >>> > >> Currently, those systems just deal with key/value pairs. > >> Should we > >> >> >>> > expose a > >> >> >>> > >> third thing header there too or somehow map header to key or > >> >> value? > >> >> >>> > >> > >> >> >>> > >> Thanks, > >> >> >>> > >> > >> >> >>> > >> Jun > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> On Tue, Nov 29, 2016 at 3:35 AM, Michael Pearce < > >> >> >>> michael.pea...@ig.com> > >> >> >>> > >> wrote: > >> >> >>> > >> > >> >> >>> > >> > I assume, that after a period of a week, that there is no > >> >> concerns > >> >> >>> now > >> >> >>> > >> > with points 1, and 2 and now we have agreement that > headers > >> are > >> >> >>> useful > >> >> >>> > >> and > >> >> >>> > >> > needed in Kafka. As such if put to a KIP vote, this > wouldn’
Re: [DISCUSS] KIP-82 - Add Record Headers
; on >> >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. >> >> >>> > >> >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters >> >> >>> > >> This is actually interesting. Today, to avoid introducing >> cycles >> >> when >> >> >>> > doing >> >> >>> > >> mirroring across data centers, one would either have to set up >> two >> >> >>> Kafka >> >> >>> > >> clusters (a local and an aggregate) per data center or rename >> >> topics. >> >> >>> > >> Neither is ideal. With headers, the producer could tag each >> >> message >> >> >>> with >> >> >>> > >> the producing cluster ID in the header. MirrorMaker could then >> >> avoid >> >> >>> > >> mirroring messages to a cluster if they are tagged with the >> same >> >> >>> cluster >> >> >>> > >> id. >> >> >>> > >> >> >> >>> > >> However, an alternative approach is to introduce sth like >> >> >>> hierarchical >> >> >>> > >> topic and store messages from different clusters in different >> >> >>> partitions >> >> >>> > >> under the same topic. This approach avoids filtering out >> unneeded >> >> >>> data >> >> >>> > and >> >> >>> > >> makes offset preserving easier to support. It may make >> compaction >> >> >>> > trickier >> >> >>> > >> though since the same key may show up in different partitions. >> >> >>> > >> >> >> >>> > >> E. record-level lineage >> >> >>> > >> For example, a source connector could store in the message the >> >> >>> metadata >> >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream job >> >> >>> transforms >> >> >>> > >> messages from topic A to topic B, the library could include the >> >> >>> source >> >> >>> > >> message offset in each of the transformed message in the >> header. >> >> Not >> >> >>> > sure >> >> >>> > >> how widely useful record-level lineage is though since the >> >> overhead >> >> >>> > could >> >> >>> > >> be significant. >> >> >>> > >> >> >> >>> > >> F. auditing metadata >> >> >>> > >> We could put things like clientId/host/user in the header in >> each >> >> >>> > message >> >> >>> > >> for auditing. These metadata are really at the producer level >> >> though. >> >> >>> > So, a >> >> >>> > >> more efficient way is to only include a "producerId" per >> message >> >> and >> >> >>> > send >> >> >>> > >> the producerId -> metadata mapping independently. KIP-98 is >> >> actually >> >> >>> > >> proposing including such a producerId natively in the message. >> >> >>> > >> >> >> >>> > >> So, overall, I not sure that I am fully convinced of the strong >> >> >>> > third-party >> >> >>> > >> use cases of headers yet. Perhaps we could discuss a bit more >> to >> >> make >> >> >>> > one >> >> >>> > >> or two really convincing use cases. >> >> >>> > >> >> >> >>> > >> Another orthogonal question is whether header should be >> exposed >> >> in >> >> >>> > stream >> >> >>> > >> processing systems such Kafka stream, Samza, and Spark >> streaming. >> >> >>> > >> Currently, those systems just deal with key/value pairs. >> Should we >> >> >>> > expose a >> >> >>> > >> third thing header there too or somehow map header to key or >> >> value? >> &
Re: [DISCUSS] KIP-82 - Add Record Headers
sage the > >> >>> metadata > >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream job > >> >>> transforms > >> >>> > >> messages from topic A to topic B, the library could include the > >> >>> source > >> >>> > >> message offset in each of the transformed message in the > header. > >> Not > >> >>> > sure > >> >>> > >> how widely useful record-level lineage is though since the > >> overhead > >> >>> > could > >> >>> > >> be significant. > >> >>> > >> > >> >>> > >> F. auditing metadata > >> >>> > >> We could put things like clientId/host/user in the header in > each > >> >>> > message > >> >>> > >> for auditing. These metadata are really at the producer level > >> though. > >> >>> > So, a > >> >>> > >> more efficient way is to only include a "producerId" per > message > >> and > >> >>> > send > >> >>> > >> the producerId -> metadata mapping independently. KIP-98 is > >> actually > >> >>> > >> proposing including such a producerId natively in the message. > >> >>> > >> > >> >>> > >> So, overall, I not sure that I am fully convinced of the strong > >> >>> > third-party > >> >>> > >> use cases of headers yet. Perhaps we could discuss a bit more > to > >> make > >> >>> > one > >> >>> > >> or two really convincing use cases. > >> >>> > >> > >> >>> > >> Another orthogonal question is whether header should be > exposed > >> in > >> >>> > stream > >> >>> > >> processing systems such Kafka stream, Samza, and Spark > streaming. > >> >>> > >> Currently, those systems just deal with key/value pairs. > Should we > >> >>> > expose a > >> >>> > >> third thing header there too or somehow map header to key or > >> value? > >> >>> > >> > >> >>> > >> Thanks, > >> >>> > >> > >> >>> > >> Jun > >> >>> > >> > >> >>> > >> > >> >>> > >> On Tue, Nov 29, 2016 at 3:35 AM, Michael Pearce < > >> >>> michael.pea...@ig.com> > >> >>> > >> wrote: > >> >>> > >> > >> >>> > >> > I assume, that after a period of a week, that there is no > >> concerns > >> >>> now > >> >>> > >> > with points 1, and 2 and now we have agreement that headers > are > >> >>> useful > >> >>> > >> and > >> >>> > >> > needed in Kafka. As such if put to a KIP vote, this wouldn’t > be > >> a > >> >>> > reason > >> >>> > >> to > >> >>> > >> > reject. > >> >>> > >> > > >> >>> > >> > @ > >> >>> > >> > Ignacio on point 4). > >> >>> > >> > I think for purpose of getting this KIP moving past this, we > can > >> >>> state > >> >>> > >> the > >> >>> > >> > key will be a 4 bytes space that can will be naturally > >> interpreted > >> >>> as > >> >>> > an > >> >>> > >> > Int32 (if namespacing is later wanted you can easily split > this > >> >>> into > >> >>> > two > >> >>> > >> > int16 spaces), from the wire protocol implementation this > makes > >> no > >> >>> > >> > difference I don’t believe. Is this reasonable to all? > >> >>> > >> > > >> >>> > >> > On 5) as per point 4 therefor happy we keep with 32 bits. > >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> >>> > >> &g
Re: [DISCUSS] KIP-82 - Add Record Headers
>> better? >> >>> > >> >> >>> > >> C. per message encryption >> >>> > >> One drawback of this approach is that this significantly reduce >> the >> >>> > >> effectiveness of compression, which happens on a set of serialized >> >>> > >> messages. An alternative is to enable SSL for wire encryption and >> >>> rely >> >>> > on >> >>> > >> the storage system (e.g. LUKS) for at rest encryption. >> >>> > >> >> >>> > >> D. cluster ID for mirroring across Kafka clusters >> >>> > >> This is actually interesting. Today, to avoid introducing cycles >> when >> >>> > doing >> >>> > >> mirroring across data centers, one would either have to set up two >> >>> Kafka >> >>> > >> clusters (a local and an aggregate) per data center or rename >> topics. >> >>> > >> Neither is ideal. With headers, the producer could tag each >> message >> >>> with >> >>> > >> the producing cluster ID in the header. MirrorMaker could then >> avoid >> >>> > >> mirroring messages to a cluster if they are tagged with the same >> >>> cluster >> >>> > >> id. >> >>> > >> >> >>> > >> However, an alternative approach is to introduce sth like >> >>> hierarchical >> >>> > >> topic and store messages from different clusters in different >> >>> partitions >> >>> > >> under the same topic. This approach avoids filtering out unneeded >> >>> data >> >>> > and >> >>> > >> makes offset preserving easier to support. It may make compaction >> >>> > trickier >> >>> > >> though since the same key may show up in different partitions. >> >>> > >> >> >>> > >> E. record-level lineage >> >>> > >> For example, a source connector could store in the message the >> >>> metadata >> >>> > >> (e.g. UUID) of the source record. Similarly, if a stream job >> >>> transforms >> >>> > >> messages from topic A to topic B, the library could include the >> >>> source >> >>> > >> message offset in each of the transformed message in the header. >> Not >> >>> > sure >> >>> > >> how widely useful record-level lineage is though since the >> overhead >> >>> > could >> >>> > >> be significant. >> >>> > >> >> >>> > >> F. auditing metadata >> >>> > >> We could put things like clientId/host/user in the header in each >> >>> > message >> >>> > >> for auditing. These metadata are really at the producer level >> though. >> >>> > So, a >> >>> > >> more efficient way is to only include a "producerId" per message >> and >> >>> > send >> >>> > >> the producerId -> metadata mapping independently. KIP-98 is >> actually >> >>> > >> proposing including such a producerId natively in the message. >> >>> > >> >> >>> > >> So, overall, I not sure that I am fully convinced of the strong >> >>> > third-party >> >>> > >> use cases of headers yet. Perhaps we could discuss a bit more to >> make >> >>> > one >> >>> > >> or two really convincing use cases. >> >>> > >> >> >>> > >> Another orthogonal question is whether header should be exposed >> in >> >>> > stream >> >>> > >> processing systems such Kafka stream, Samza, and Spark streaming. >> >>> > >> Currently, those systems just deal with key/value pairs. Should we >> >>> > expose a >> >>> > >> third thing header there too or somehow map header to key or >> value? >> >>> > >> >> >>> > >> Thanks, >> >>> > >> >> >>> > >> Jun >> >>> > >> >> >>> > >> >> >>> > >> On Tue, Nov 2